使用 pyAerial 进行 LDPC 编码-解码链#
本示例展示了如何使用 pyAerial Python 绑定来运行 5G NR LDPC 编码、速率匹配和解码。信息比特,即传输块,被分割成代码块,进行 LDPC 编码,并根据可用的时频资源(资源元素)进行速率匹配,所有步骤都精确遵循 TS 38.212 标准。然后,这些比特通过使用 QPSK 调制的 AWGN 信道传输。在接收端,从接收到的符号中提取对数似然比,执行(解)速率匹配,并运行 LDPC 解码器以获取传输的信息比特。最后,代码块被重新连接成接收到的传输块。
pyAerial 在底层为所有组件使用了 cuPHY 库。然而,在本示例中,CRC 只是随机的比特块,因为我们可以直接比较发送和接收的比特来计算误块率。
NVIDIA Sionna 库用于模拟无线信道。
[1]:
# Check platform.
import platform
if platform.machine() != 'x86_64':
raise SystemExit("Unsupported platform!")
导入#
[2]:
%matplotlib widget
from cuda import cudart
from collections import defaultdict
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "3" # Silence TensorFlow.
import numpy as np
import sionna
import tensorflow as tf
from aerial.phy5g.ldpc import LdpcEncoder
from aerial.phy5g.ldpc import LdpcDecoder
from aerial.phy5g.ldpc import LdpcRateMatch
from aerial.phy5g.ldpc import LdpcDeRateMatch
from aerial.phy5g.ldpc import CrcChecker
from aerial.phy5g.ldpc import get_mcs
from aerial.phy5g.ldpc import random_tb
from aerial.phy5g.ldpc import code_block_segment
from aerial.phy5g.ldpc import get_crc_len
from simulation_monitor import SimulationMonitor
# Configure the notebook to use only a single GPU and allocate only as much memory as needed.
# For more details, see https://tensorflowcn.cn/guide/gpu.
gpus = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
参数#
设置仿真参数、一些 numerology 参数,启用/禁用加扰等。
[3]:
# Simulation parameters.
esno_db_range = np.arange(8.1, 8.8, 0.1)
num_slots = 10000
min_num_tb_errors = 250
# Numerology and frame structure. See TS 38.211.
num_prb = 100 # Number of allocated PRBs. This is used to compute the transport block
# as well as the rate matching length.
start_sym = 0 # PxSCH start symbol
num_symbols = 14 # Number of symbols in a slot.
num_slots_per_frame = 20 # Number of slots in a single frame.
num_layers = 1
dmrs_sym = [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]
# Rate matching procedure includes scrambling if this flag is set.
enable_scrambling = True
# The scrambling initialization value is computed as per TS 38.211
# using the RNTI and data scrambling ID.
rnti = 20000 # UE RNTI
data_scid = 41 # Data scrambling ID
cinit = (rnti << 15) + data_scid
rv = 0 # Redundancy version
mcs = 9 # MCS index as per TS 38.214 table. Note: Es/No range may need to be changed too to get meaningful results.
mod_order, code_rate = get_mcs(mcs)
code_rate /= 1024.
创建 LDPC 编码链对象#
LDPC 编码链对象在此处创建。这包括以下内容
LdpcEncoder
它接受信息比特,即传输块,分割成代码块作为输入,并输出编码后的代码块。LdpcRateMatch
它接受编码后的代码块作为输入,并输出速率匹配(以及可选的加扰)的比特流。LdpcDerateMatch
它接受接收到的对数似然比 (LLR) 流作为输入,并输出解速率匹配的代码块 LLR,这些 LLR 可以馈送到 LDPC 解码。如果管道中启用了加扰,则此模块还执行解扰。LdpcDecoder
它接受 LDPC 解速率匹配的输出,并将 LLR 解码为代码块,这些代码块随后可以进一步连接成接收到的传输块。CrcChecker
它接受 LDPC 解码块的输出,移除代码块 CRC,将代码块连接成完整的传输块,最后检查并移除传输块 CRC。
所有组件均基于 TS 38.212 标准,因此可以用于发送/接收符合 5G NR 标准的比特流。
Sionna 信道组件和调制映射器也在此处创建。
[4]:
# Create also the CUDA stream that running the objects requires.
cudart.cudaSetDevice(0)
cuda_stream = cudart.cudaStreamCreate()[1]
cudart.cudaStreamSynchronize(cuda_stream)
# Create the Aerial Python LDPC objects.
ldpc_encoder = LdpcEncoder(cuda_stream=cuda_stream)
ldpc_decoder = LdpcDecoder(cuda_stream=cuda_stream)
ldpc_rate_match = LdpcRateMatch(enable_scrambling=enable_scrambling, cuda_stream=cuda_stream)
ldpc_derate_match = LdpcDeRateMatch(enable_scrambling=enable_scrambling, cuda_stream=cuda_stream)
crc_checker = CrcChecker(cuda_stream=cuda_stream)
# Create the Sionna modulation mapper/demapper and the AWGN channel.
mapper = sionna.mapping.Mapper("qam", mod_order)
demapper = sionna.mapping.Demapper("app", "qam", mod_order)
channel = sionna.channel.AWGN()
主仿真循环#
[5]:
case = "LDPC decoding perf."
monitor = SimulationMonitor([case], esno_db_range)
# Loop the Es/No range.
for esno_db in esno_db_range:
monitor.step(esno_db)
num_tb_errors = defaultdict(int)
# Run multiple slots and compute BLER.
for slot_idx in range(num_slots):
slot_number = slot_idx % num_slots_per_frame
# Generate a random transport block (in bits).
transport_block = random_tb(
mod_order=mod_order,
code_rate=code_rate * 1024,
dmrs_syms=dmrs_sym,
num_prbs=num_prb,
start_sym=start_sym,
num_symbols=num_symbols,
num_layers=num_layers,
return_bits=True
)
tb_size = transport_block.shape[0]
# Attach a CRC. This is emulated to get the TB size with CRC right, however the CRC is in this case just random
# bits as we are comparing the transmitted and received bits directly to get the BLER (instead of doing an actual
# CRC check).
crc_length = get_crc_len(tb_size)
crc = np.random.randint(0, 1, size=crc_length, dtype=np.uint8)
transport_block = np.concatenate((transport_block, crc))
# Code block segmentation happens here. Note: This is just Python at the moment.
code_blocks = code_block_segment(tb_size, transport_block, code_rate)
# Run the LDPC encoding. The LDPC encoder takes a K x C array as its input, where K is the number of bits per code
# block and C is the number of code blocks. Its output is N x C where N is the number of coded bits per code block.
# If there is more than one code block, a code block CRC (random in this case as we do not need an actual CRC) is
# attached to
coded_bits = ldpc_encoder.encode(
input_data=code_blocks,
tb_size=tb_size,
code_rate=code_rate,
redundancy_version=rv
)
# Run rate matching. This needs rate matching length as its input, meaning the number of bits that can be
# transmitted within the allocated resource elements. The input data is fed as 32-bit floats.
num_data_sym = (np.array(dmrs_sym[start_sym:start_sym + num_symbols]) == 0).sum()
rate_match_len = num_data_sym * num_prb * 12 * num_layers * mod_order
rate_matched_bits = ldpc_rate_match.rate_match(
input_data=coded_bits,
tb_size=tb_size,
code_rate=code_rate,
rate_match_len=rate_match_len,
mod_order=mod_order,
num_layers=num_layers,
redundancy_version=rv,
cinit=cinit
)
# Map the bits to symbols and transmit through an AWGN channel. All this in Sionna.
rate_matched_bits = rate_matched_bits[:, 0]
no = sionna.utils.ebnodb2no(esno_db, num_bits_per_symbol=1, coderate=1)
tx_symbols = mapper(rate_matched_bits[None])
rx_symbols = channel([tx_symbols, no])
llr = -1. * demapper([rx_symbols, no])[0, :].numpy()[:, None]
# Run receiver side (de)rate matching. The input is the received array of bits directly, and the output
# is a NumPy array of size N x C of log likelihood ratios, represented as 32-bit floats. Descrambling
# is also performed here in case scrambling is enabled.
derate_matched_bits = ldpc_derate_match.derate_match(
input_llrs=[llr],
tb_sizes=[tb_size],
code_rates=[code_rate],
rate_match_lengths=[rate_match_len],
mod_orders=[mod_order],
num_layers=[num_layers],
redundancy_versions=[rv],
ndis=[1],
cinits=[cinit]
)
# Run LDPC decoding. The decoder takes the derate matching output as its input and returns
decoded_bits = ldpc_decoder.decode(
input_llrs=derate_matched_bits,
tb_sizes=[tb_size],
code_rates=[code_rate],
redundancy_versions=[rv],
rate_match_lengths=[rate_match_len]
)
# Combine code blocks into a transport block. CRC ignored as it was just random bits in this example.
decoded_tb, _ = crc_checker.check_crc(
input_bits=decoded_bits,
tb_sizes=[tb_size],
code_rates=[code_rate]
)
decoded_tb = np.unpackbits(decoded_tb) # To bits.
tb_error = not np.array_equal(decoded_tb, transport_block[:-crc_length])
num_tb_errors[case] += tb_error
monitor.update(num_tbs=slot_idx + 1, num_tb_errors=num_tb_errors)
if (np.array(list(num_tb_errors.values())) >= min_num_tb_errors).all():
break # Next Es/No value.
monitor.finish_step(num_tbs=slot_idx + 1, num_tb_errors=num_tb_errors)
monitor.finish()
LDPC decoding perf.
--------------------
Es/No (dB) TBs TB Errors BLER ms/TB
==================== ==================== ========
8.10 250 250 1.0000 19.9
8.20 255 250 0.9804 15.7
8.30 322 250 0.7764 15.7
8.40 811 250 0.3083 15.7
8.50 3846 250 0.0650 15.6
8.60 10000 73 0.0073 15.4
8.70 10000 1 0.0001 15.4
8.80 10000 0 0.0000 15.3