使用 pyAerial 在 Aerial Data Lake 数据上进行 PUSCH 解码#

此示例演示了如何使用 pyAerial 绑定来运行 cuPHY GPU 加速的 5G NR PUSCH 解码。5G NR PUSCH 数据是从使用 Aerial Data Lake 收集和存储的空中捕获 PUSCH 数据集示例中读取的。构建 PUSCH 接收器使用 pyAerial 通过两种方式演示,首先是使用完全融合的完整 PUSCH 接收器,仅使用单个函数调用从 Python 调用。然后通过使用单独的 Python 函数调用各个 PUSCH 接收器组件来构建完整的 PUSCH 接收器来实现相同的目的。

注意: 此示例需要 clickhouse 服务器正在运行,并且示例数据已存储在数据库中。有关如何执行此操作,请参阅 Aerial Data Lake 文档。

[1]:
# Check platform.
import platform
if platform.machine() not in ['x86_64', 'aarch64']:
    raise SystemExit("Unsupported platform!")

导入#

[2]:
import math
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import numpy as np
import pandas as pd
from IPython.display import Markdown
from IPython.display import display

# Connecting to clickhouse on remote server
import clickhouse_connect

# Plotting with Bokeh.
import matplotlib.pyplot as plt

# pyAerial imports
from aerial.phy5g.algorithms import ChannelEstimator
from aerial.phy5g.algorithms import ChannelEqualizer
from aerial.phy5g.algorithms import NoiseIntfEstimator
from aerial.phy5g.algorithms import Demapper
from aerial.phy5g.ldpc import LdpcDeRateMatch
from aerial.phy5g.ldpc import LdpcDecoder
from aerial.phy5g.ldpc import CrcChecker
from aerial.phy5g.pusch import PuschRx
from aerial.util.cuda import get_cuda_stream
from aerial.util.fapi import dmrs_fapi_to_bit_array

# Hide log10(10) warning
_ = np.seterr(divide='ignore', invalid='ignore')

创建 PUSCH 管道#

这是一个由单独调用的 pyAerial PUSCH 接收器组件组成的 PUSCH 接收器管道。

[3]:
# Whether to plot intermediate results within the PUSCH pipeline, such as channel estimates and equalized symbols.
plot_figures = True

num_ues = 1
num_tx_ant = 2             # UE antennas
num_rx_ant = 4             # gNB antennas
cell_id = 41               # Physical cell ID
enable_pusch_tdi = 0       # Enable time interpolation for equalizer coefficients
eq_coeff_algo = 1          # Equalizer algorithm

# The PUSCH receiver chain built from separately called pyAerial Python components is defined here.
class PuschRxSeparate:
    """PUSCH receiver class.

    This class encapsulates the whole PUSCH receiver chain built using
    pyAerial components.
    """

    def __init__(self,
                 num_rx_ant,
                 enable_pusch_tdi,
                 eq_coeff_algo,
                 plot_figures):
        """Initialize the PUSCH receiver."""
        self.cuda_stream = get_cuda_stream()

        # Build the components of the receiver.
        self.channel_estimator = ChannelEstimator(
            num_rx_ant=num_rx_ant,
            cuda_stream=self.cuda_stream)
        self.channel_equalizer = ChannelEqualizer(
            num_rx_ant=num_rx_ant,
            enable_pusch_tdi=enable_pusch_tdi,
            eq_coeff_algo=eq_coeff_algo,
            cuda_stream=self.cuda_stream)
        self.noise_intf_estimator = NoiseIntfEstimator(
            num_rx_ant=num_rx_ant,
            eq_coeff_algo=eq_coeff_algo,
            cuda_stream=self.cuda_stream)
        self.derate_match = LdpcDeRateMatch(
            enable_scrambling=True,
            cuda_stream=self.cuda_stream)
        self.decoder = LdpcDecoder(cuda_stream=self.cuda_stream)
        self.crc_checker = CrcChecker(cuda_stream=self.cuda_stream)

        # Whether to plot the intermediate results.
        self.plot_figures = plot_figures

    def run(
        self,
        rx_slot,
        num_ues,
        slot,
        num_dmrs_cdm_grps_no_data,
        dmrs_scrm_id,
        start_prb,
        num_prbs,
        dmrs_syms,
        dmrs_max_len,
        dmrs_add_ln_pos,
        start_sym,
        num_symbols,
        scids,
        layers,
        dmrs_ports,
        rntis,
        data_scids,
        code_rates,
        mod_orders,
        tb_sizes
    ):
        """Run the receiver."""
        # Channel estimation.
        ch_est = self.channel_estimator.estimate(
            rx_slot=rx_slot,
            num_ues=num_ues,
            slot=slot,
            num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data,
            dmrs_scrm_id=dmrs_scrm_id,
            start_prb=start_prb,
            num_prbs=num_prbs,
            prg_size=1,
            num_ul_streams=1,
            dmrs_syms=dmrs_syms,
            dmrs_max_len=dmrs_max_len,
            dmrs_add_ln_pos=dmrs_add_ln_pos,
            start_sym=start_sym,
            num_symbols=num_symbols,
            scids=scids,
            layers=layers,
            dmrs_ports=dmrs_ports
        )

        # Noise and interference estimation.
        lw_inv, noise_var_pre_eq = self.noise_intf_estimator.estimate(
            rx_slot=rx_slot,
            channel_est=ch_est,
            num_ues=num_ues,
            slot=slot,
            num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data,
            dmrs_scrm_id=dmrs_scrm_id,
            start_prb=start_prb,
            num_prbs=num_prbs,
            dmrs_syms=dmrs_syms,
            dmrs_max_len=dmrs_max_len,
            dmrs_add_ln_pos=dmrs_add_ln_pos,
            start_sym=start_sym,
            num_symbols=num_symbols,
            scids=scids,
            layers=layers,
            dmrs_ports=dmrs_ports
        )

        # Channel equalization and soft demapping. The first return value are the LLRs,
        # second are the equalized symbols. We only want the LLRs now.
        llrs, sym = self.channel_equalizer.equalize(
            rx_slot=rx_slot,
            channel_est=ch_est,
            lw_inv=lw_inv,
            noise_var_pre_eq=noise_var_pre_eq,
            num_ues=num_ues,
            num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data,
            start_prb=start_prb,
            num_prbs=num_prbs,
            dmrs_syms=dmrs_syms,
            dmrs_max_len=dmrs_max_len,
            dmrs_add_ln_pos=dmrs_add_ln_pos,
            start_sym=start_sym,
            num_symbols=num_symbols,
            layers=layers,
            mod_orders=mod_orders
        )

        if self.plot_figures:
            fig, axs = plt.subplots(1,4)
            for ant in range(4):
                axs[ant].imshow(10*np.log10(np.abs(rx_slot[:, :, ant]**2)), aspect='auto')
                axs[ant].set_ylim([pusch_record.rbStart * 12, pusch_record.rbSize * 12])
                axs[ant].set_title('Ant ' + str(ant))
                axs[ant].set(xlabel='Symbol', ylabel='Resource Element')
                axs[ant].label_outer()
            fig.suptitle('Power in RU Antennas')

            fig, axs = plt.subplots(1,2)
            axs[0].scatter(rx_slot.reshape(-1).real, rx_slot.reshape(-1).imag)
            axs[0].set_title("Pre-Equalized samples")
            axs[0].set_aspect('equal')

            axs[1].scatter(np.array(sym).reshape(-1).real, np.array(sym).reshape(-1).imag)
            axs[1].set_title("Post-Equalized samples")
            axs[1].set_aspect('equal')

            fig, axs = plt.subplots(1)
            axs.set_title("Channel estimates from the PUSCH pipeline")
            for ant in range(4):
                axs.plot(np.abs(ch_est[0][ant, 0, :, 0]))
            axs.legend(["Rx antenna 0, estimate",
                        "Rx antenna 1, estimate",
                        "Rx antenna 2, estimate",
                        "Rx antenna 3, estimate"])
            axs.grid(True)
            plt.show()



        num_data_sym = (np.array(dmrs_syms[start_sym:start_sym + num_symbols]) == 0).sum()
        cinits = [(rntis[ue] << 15) + data_scids[ue] for ue in range(num_ues)]
        rate_match_lengths = [num_data_sym * mod_orders[ue] * num_prbs * 12 * layers[ue]
                              for ue in range(num_ues)]
        tb_sizes = [s * 8 for s in tb_sizes]
        code_rates = [c / 1024. for c in code_rates]
        rvs = [0,] * num_ues
        ndis = [1,] * num_ues

        coded_blocks = self.derate_match.derate_match(
            input_llrs=llrs,
            tb_sizes=tb_sizes,
            code_rates=code_rates,
            rate_match_lengths=rate_match_lengths,
            mod_orders=mod_orders,
            num_layers=layers,
            redundancy_versions=rvs,
            ndis=ndis,
            cinits=cinits
        )

        code_blocks = self.decoder.decode(
            input_llrs=coded_blocks,
            tb_sizes=tb_sizes,
            code_rates=code_rates,
            redundancy_versions=rvs,
            rate_match_lengths=rate_match_lengths
        )

        decoded_tbs, tb_crcs = self.crc_checker.check_crc(
            input_bits=code_blocks,
            tb_sizes=tb_sizes,
            code_rates=code_rates
        )

        return decoded_tbs

pusch_rx_separate = PuschRxSeparate(
    num_rx_ant=num_rx_ant,
    enable_pusch_tdi=enable_pusch_tdi,
    eq_coeff_algo=eq_coeff_algo,
    plot_figures=plot_figures
)

# This is the fully fused PUSCH receiver chain.
pusch_rx = PuschRx(
    cell_id=cell_id,
    num_rx_ant=num_rx_ant,
    num_tx_ant=num_rx_ant,
    enable_pusch_tdi=enable_pusch_tdi,
    eq_coeff_algo=eq_coeff_algo
)

查询数据库#

下面展示了如何连接到 clickhouse 数据库并从中查询数据。

[4]:
# Connect to the local database
client = clickhouse_connect.get_client(host='localhost')

# Pick a packet from the database
pusch_records = client.query_df('select * from fapi where mcsIndex != 0 order by TsTaiNs limit 10')

提取 PUSCH 参数并运行管道#

[5]:
for index, pusch_record in pusch_records.iterrows():
    query = f"""select TsTaiNs,fhData from fh where
            TsTaiNs == {pusch_record.TsTaiNs.timestamp()}
            """
    fh = client.query_df(query)

    display(Markdown("### Example {} - SFN.Slot {}.{} from time {}"
                     .format(index + 1, pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs
    )))

    # Make sure that the fronthaul database is complete for the SFN.Slot we've chosen
    if fh.index.size < 1:
        pusch_records = pusch_records.drop(index)
        continue;

    fh_samp = np.array(fh['fhData'][0], dtype=np.float32)
    rx_slot = np.swapaxes(fh_samp.view(np.complex64).reshape(4, 14, 273 * 12), 2, 0)

    # Extract all the needed parameters from the PUSCH record.
    slot = int(pusch_record.Slot)
    rntis = [pusch_record.rnti]
    layers = [pusch_record.nrOfLayers]
    start_prb = pusch_record.rbStart
    num_prbs = pusch_record.rbSize
    start_sym = pusch_record.StartSymbolIndex
    num_symbols = pusch_record.NrOfSymbols
    scids = [int(pusch_record.SCID)]
    data_scids = [pusch_record.dataScramblingId]
    dmrs_scrm_id = pusch_record.ulDmrsScramblingId
    num_dmrs_cdm_grps_no_data = pusch_record.numDmrsCdmGrpsNoData
    dmrs_syms = dmrs_fapi_to_bit_array(int(pusch_record.ulDmrsSymbPos))
    dmrs_ports = [pusch_record.dmrsPorts]
    dmrs_max_len = 1
    dmrs_add_ln_pos = 2
    mcs_tables = [pusch_record.mcsTable]
    mcs_indices = [pusch_record.mcsIndex]
    coderates = [pusch_record.targetCodeRate / 10.]
    tb_sizes = [pusch_record.TBSize]
    mod_orders = [pusch_record.qamModOrder]
    tb_input = np.array(pusch_record.pduData)

    # Run the receiver built from separately called components.
    tbs = pusch_rx_separate.run(
        rx_slot=rx_slot,
        num_ues=num_ues,
        slot=slot,
        num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data,
        dmrs_scrm_id=dmrs_scrm_id,
        start_prb=start_prb,
        num_prbs=num_prbs,
        dmrs_syms=dmrs_syms,
        dmrs_max_len=dmrs_max_len,
        dmrs_add_ln_pos=dmrs_add_ln_pos,
        start_sym=start_sym,
        num_symbols=num_symbols,
        scids=scids,
        layers=layers,
        dmrs_ports=dmrs_ports,
        rntis=rntis,
        data_scids=data_scids,
        code_rates=coderates,
        mod_orders=mod_orders,
        tb_sizes=tb_sizes
    )
    if np.array_equal(tbs[0][:tb_input.size], tb_input):
        display(Markdown("**Separated kernels PUSCH decoding success** for SFN.Slot {}.{} from time {}".format(pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs)))
    else:
        display(Markdown("**Separated kernels PUSCH decoding failure**"))
        print("Output bytes:")
        print(tbs[0][:tb_input.size])
        print("Expected output:")
        print(tb_input)

    # Run the fused PUSCH receiver.
    # Note that this is where we set the dynamically changing parameters.
    tb_crcs, tbs = pusch_rx.run(
        rx_slot=rx_slot,
        num_ues=num_ues,
        slot=slot,
        num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data,
        dmrs_scrm_id=dmrs_scrm_id,
        start_prb=start_prb,
        num_prbs=num_prbs,
        dmrs_syms=dmrs_syms,
        dmrs_max_len=dmrs_max_len,
        dmrs_add_ln_pos=dmrs_add_ln_pos,
        start_sym=start_sym,
        num_symbols=num_symbols,
        scids=scids,
        layers=layers,
        dmrs_ports=dmrs_ports,
        rntis=rntis,
        data_scids=data_scids,
        code_rates=coderates,
        mod_orders=mod_orders,
        tb_sizes=tb_sizes
    )

    if np.array_equal(tbs[0][:tb_input.size], tb_input):
        display(Markdown("**Fused PUSCH decoding success** for SFN.Slot {}.{} from time {}".format(pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs)))
    else:
        display(Markdown("**Fused PUSCH decoding failure**"))
        print("Output bytes:")
        print(tbs[0][:tb_input.size])
        print("Expected output:")
        print(tb_input)

示例 1 - 来自时间 2024-03-21 12:18:39.162000 的 SFN.Slot 192.4#

../../_images/content_notebooks_datalake_pusch_decoding_9_1.png
../../_images/content_notebooks_datalake_pusch_decoding_9_2.png
../../_images/content_notebooks_datalake_pusch_decoding_9_3.png

分离内核 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.162000 的 SFN.Slot 192.4

融合 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.162000 的 SFN.Slot 192.4

示例 2 - 来自时间 2024-03-21 12:18:39.187000 的 SFN.Slot 194.14#

../../_images/content_notebooks_datalake_pusch_decoding_9_7.png
../../_images/content_notebooks_datalake_pusch_decoding_9_8.png
../../_images/content_notebooks_datalake_pusch_decoding_9_9.png

分离内核 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.187000 的 SFN.Slot 194.14

融合 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.187000 的 SFN.Slot 194.14

示例 3 - 来自时间 2024-03-21 12:18:39.192000 的 SFN.Slot 195.4#

../../_images/content_notebooks_datalake_pusch_decoding_9_13.png
../../_images/content_notebooks_datalake_pusch_decoding_9_14.png
../../_images/content_notebooks_datalake_pusch_decoding_9_15.png

分离内核 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.192000 的 SFN.Slot 195.4

融合 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.192000 的 SFN.Slot 195.4

示例 4 - 来自时间 2024-03-21 12:18:39.252000 的 SFN.Slot 201.4#

../../_images/content_notebooks_datalake_pusch_decoding_9_19.png
../../_images/content_notebooks_datalake_pusch_decoding_9_20.png
../../_images/content_notebooks_datalake_pusch_decoding_9_21.png

分离内核 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.252000 的 SFN.Slot 201.4

融合 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.252000 的 SFN.Slot 201.4

示例 5 - 来自时间 2024-03-21 12:18:39.332000 的 SFN.Slot 209.4#

../../_images/content_notebooks_datalake_pusch_decoding_9_25.png
../../_images/content_notebooks_datalake_pusch_decoding_9_26.png
../../_images/content_notebooks_datalake_pusch_decoding_9_27.png

分离内核 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.332000 的 SFN.Slot 209.4

融合 PUSCH 解码成功,适用于来自时间 2024-03-21 12:18:39.332000 的 SFN.Slot 209.4