Module polarcodes.PolarCode

An object that encapsulates all of the parameters required to define a polar code. This object must be given to the following classes: AWGN, Construct, Decode, Encode, GUI, Shorten.

Expand source code
#!/usr/bin/env python

"""
An object that encapsulates all of the parameters required to define a polar code.
This object must be given to the following classes: `AWGN`, `Construct`, `Decode`, `Encode`, `GUI`, `Shorten`.
"""

import numpy as np
from polarcodes.utils import *
from polarcodes.Construct import Construct
from polarcodes.Shorten import Shorten
from polarcodes.Encode import Encode
from polarcodes.Decode import Decode
from polarcodes.AWGN import AWGN
import json
import matplotlib.pyplot as plt
import threading
import tkinter as tk

class PolarCode:
    """
    Attributes
    ----------
    N: int
        the mothercode block length
    M: int
        the block length (after puncturing)
    K: int
        the code dimension
    n: int
        number of bits per index
    s: int
        number of shortened bit-channels
    reliabilities: ndarray<int>
        reliability vector (least reliable to most reliable)
    frozen: ndarray<int>
        the frozen bit indices
    frozen_lookup: ndarray<int>
        lookup table for the frozen bits
    x: ndarray<int>
        the uncoded message with frozen bits
    construction_type: string
        the mothercode construction type
    message_received: ndarray<int>
        the decoded message received from a channel
    punct_flag: bool
        whether or not the code is punctured
    simulated_snr: ndarray<float>
        the SNR values simulated
    simulated_fer: ndarray<float>
        the FER values for the SNR values in ``simulated_snr`` using `simulate`
    simulated_ber: ndarray<float>
        the BER values for the SNR values in ``simulated_snr`` using `simulate`
    punct_type: string
        'punct' for puncturing, and 'shorten' for shortening
    punct_set: ndarray<int>
        the coded punctured indices
    punct_set_lookup: ndarray<int>
        lookup table for ``punct_set``
    source_set: ndarray<int>
        the uncoded punctured indices
    source_set_lookup: ndarray<int>
        lookup table for ``source_set``
    punct_algorithm: string
        the name of a puncturing algorithm. Options: {'brs', 'wls', 'bgl', 'perm'}
    update_frozen_flag: bool
        whether or not to update the frozen indices after puncturing
    recip_flag: bool
        True if ``punct_set`` equals ``source_set``

    """

    def __init__(self, M, K, punct_params=('', '', [], [], None,)):
        """
        Parameters
        ----------
        M: int
            the block length (after puncturing)
        K: int
            the code dimension
        punct_params: tuple
            a tuple to completely specify the puncturing parameters (if required).
            The syntax is (``punct_type``, ``punct_algorithm``, ``punct_set``, ``source_set``, ``update_frozen_flag``)
        """

        self.initialise_code(M, K, punct_params)
        self.status_bar = None  # set by the GUI so that the simulation progress can be tracked
        self.gui_widgets = []

    def initialise_code(self, M, K, punct_params):
        """
        Initialise the code with a set of parameters the same way as the constructor.
        Call this any time you want to change the code rate.
        """

        # mothercode parameters
        self.M = M
        self.N = int(2**(np.ceil(np.log2(M))))
        self.n = int(np.log2(self.N))
        self.F = arikan_gen(self.n)
        self.K = K
        self.s = self.N - self.M
        self.reliabilities = np.array([])
        self.frozen = np.array([])
        self.frozen_lookup = np.array([])
        self.x = np.zeros(self.N, dtype=int)
        self.u = np.zeros(self.N, dtype=int)
        self.construction_type = 'bb'
        self.message_received = np.array([])
        self.punct_flag = False if self.M == self.N else True
        self.simulated_snr = np.array([])
        self.simulated_fer = np.array([])
        self.simulated_ber = np.array([])
        self.FERestimate = 0
        self.T = None

        # puncturing parameters
        self.punct_type = punct_params[0]
        self.punct_set = np.array(punct_params[2])
        self.punct_set_lookup = self.get_lut(punct_params[2])
        self.source_set = np.array(punct_params[3])
        self.source_set_lookup = self.get_lut(punct_params[3])
        self.punct_algorithm = punct_params[1]
        self.update_frozen_flag = punct_params[4]
        self.recip_flag = np.array_equal(np.array(punct_params[2]), np.array(punct_params[3]))

    def __str__(self):
        """
        A string definition of PolarCode. This allows you to print any PolarCode object and see all of its
        relevant parameters.

        Returns
        ----------
        string
            a stringified version of PolarCode

        """

        output = '=' * 10 + " Polar Code " + '=' * 10 + '\n'
        output += "N: " + str(self.N) + '\n'
        output += "M: " + str(self.M) + '\n'
        output += "K: "+ str(self.K) + '\n'
        output += "Mothercode Construction: " + self.construction_type + '\n'
        output += "Ordered Bits (least reliable to most reliable): " + str(self.reliabilities) + '\n'
        output += "Frozen Bits: " + str(self.frozen) + '\n'
        output += "Puncturing Flag: " + str(self.punct_flag) + '\n'
        output += "Puncturing Parameters: {punct_type: " + str(self.punct_type) + '\n'
        output += "                        punct_algorithm: " + str(self.punct_algorithm) + '\n'
        output += "                        punct_set: " + str(self.punct_set) + '\n'
        output += "                        source_set: " + str(self.source_set) + '\n'
        output += "                        update_frozen_flag: " + str(self.update_frozen_flag) + "}" + '\n'
        return output

    def set_message(self, m):
        """
        Set the message vector to the non-frozen bits in ``x``. The frozen bits in ``frozen`` are set to zero.

        Parameters
        ----------
        m: ndarray<int>
            the message vector

        """

        self.message = m
        self.x[self.frozen_lookup == 1] = m
        self.u = self.x.copy()

    def get_normalised_SNR(self, design_SNR):
        """
        Normalise E_b/N_o so that the message bits have the same energy for any code rate.

        Parameters
        ----------
        design_SNR: float
            E_b/N_o in decibels

        Returns
        ----------
        float
            normalised E_b/N_o in linear units

        """

        Eb_No_dB = design_SNR
        Eb_No = 10 ** (Eb_No_dB / 10)  # convert dB scale to linear
        Eb_No = Eb_No * (self.K / self.M)  # normalised message signal energy by R=K/M (M=N if not punctured)
        return Eb_No

    def get_lut(self, my_set):
        """
        Convert a set into a lookup table.

        Parameters
        ----------
        my_set: ndarray<int>
            a vector of indices

        Returns
        ----------
        ndarray<int>
            a LUT with "0" for an index in ``my_set``, else "1"

        """

        my_lut = np.ones(self.N, dtype=int)
        my_lut[my_set] = 0
        return my_lut

    def save_as_json(self, sim_filename):
        """
        Save all the important parameters in this object as a JSON file.

        Parameters
        ----------
        sim_filename: string
            directory and filename to save JSON file to (excluding extension)

        """
        data = {
            'N': self.M,
            'n': self.n,
            'K': self.K,
            'frozen': self.frozen.tolist(),
            'construction_type': self.construction_type,
            'punct_flag': self.punct_flag,
            'punct_type': self.punct_type,
            'punct_set': self.punct_set.tolist(),
            'source_set': self.source_set.tolist(),
            'punct_algorithm': self.punct_algorithm,
            'update_frozen_flag': self.update_frozen_flag,
            'BER': self.simulated_ber.tolist(),
            'FER': self.simulated_fer.tolist(),
            'SNR': self.simulated_snr.tolist()
        }
        with open(sim_filename + '.json', 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

    def run_simulation(self, Eb_No, max_iter, min_errors, min_iters):
        frame_error_count = 0
        bit_error_count = 0
        num_blocks = 0
        for i in range(1, max_iter + 1):
            # simulate random PC in an AWGN channel
            self.set_message(np.random.randint(2, size=self.K))
            Encode(self)
            AWGN(self, Eb_No)
            Decode(self)

            # detect errors
            error_vec = self.message ^ self.message_received
            num_errors = sum(error_vec)
            frame_error_count = frame_error_count + (num_errors > 1)
            bit_error_count = bit_error_count + num_errors

            # early stopping condition
            num_blocks = i
            if frame_error_count >= min_errors and i >= min_iters:
                break
        return frame_error_count, bit_error_count, num_blocks

    def simulate(self, save_to, Eb_No_vec, design_SNR=None, max_iter=100000, min_iterations=1000, min_errors=30, sim_seed=1729, manual_const_flag=True):
        """
        Monte-carlo simulation of the performance of this polar code.
        The simulation has an early stopping condition of when the number of errors is below min_errors.
        Each E_b/N_o simulation has an additional early stopping condition using the minimum iterations
        and the minimum number of errors. The results are saved in a JSON file using :func:`save_as_json`.

        Parameters
        ----------
        save_to: string
            directory and filename to save JSON file to (excluding extension)
        Eb_No_vec: ndarray<float>
            the range of SNR values to simulate
        design_SNR: float
            the construction design SNR, E_b/N_o
        max_iter: int
            maximum number of iterations per SNR
        min_iterations: int
            the minimum number of iterations before early stopping is allowed per SNR
        min_errors: int
            the minimum number of frame errors before early stopping is allowed per SNR
        sim_seed: int
            pseudo-random generator seed, default is 1729 ('twister' on MATLAB)
        manual_const_flag: bool
            a flag that decides if construction should be done before simulating.
            Set to False if mothercode and/or puncturing constructions are manually set by the user.

        """

        # initialise simulation
        np.random.seed(sim_seed)
        frame_error_rates = np.zeros(len(Eb_No_vec))
        bit_error_rates = np.zeros(len(Eb_No_vec))

        # do construction if not done already
        if not manual_const_flag:
            if self.punct_flag and self.punct_type == 'shorten':
                Shorten(self, design_SNR)
            else:
                Construct(self, design_SNR)

        print(self)
        print('=' * 10, "Simulation", '=' * 10)
        for i in range(len(Eb_No_vec)):
            # run simulation for the current SNR
            frame_error_count, bit_error_count, num_blocks = self.run_simulation(Eb_No_vec[i], max_iter, min_errors, min_iterations)

            # calculate FER and BER
            frame_error_rate = frame_error_count / num_blocks
            bit_error_rate = bit_error_count / (self.K * num_blocks)
            frame_error_rates[i] = frame_error_rate
            bit_error_rates[i] = bit_error_rate
            print("Eb/No:", round(Eb_No_vec[i], 5), "  FER:", round(frame_error_rate, 3), "  BER:", round(bit_error_rate, 5))
            print('# Iterations:', num_blocks, '  # Frame Errors:', frame_error_count, ' # Bit Errors:', bit_error_count)
            print('='*20)

            # update GUI (if used)
            if self.status_bar != None:
                self.status_bar.set("Simulation progress: " + str(i + 1) + "/" + str(len(Eb_No_vec)))

            # early stopping condition
            if frame_error_count < min_errors:
                break

        # write data to JSON file
        self.simulated_snr = Eb_No_vec
        self.simulated_ber = bit_error_rates
        self.simulated_fer = frame_error_rates
        self.save_as_json(save_to)

        # update GUI construction fields (if used)
        if self.status_bar != None:
            self.gui_widgets[3].delete("1.0", tk.END)
            self.gui_widgets[6].delete("1.0", tk.END)
            self.gui_widgets[3].insert(tk.INSERT, ",".join(map(str, self.frozen)))
            self.gui_widgets[6].insert(tk.INSERT, ",".join(map(str, self.punct_set)))

        # update console and GUI
        print("Successfully completed simulation.\n")
        if self.status_bar != None:
            self.status_bar.set("Simulation progress: Done.")

    def plot_helper(self, new_plot, sim_filenames, dir, plot_title = 'Polar Code Performance'):
        # plot the FER and BER from file list
        new_plot.cla()
        for sim_filename in sim_filenames:
            with open(dir + sim_filename + '.json') as data_file:
                data_loaded = json.load(data_file)
            new_plot.plot(data_loaded['SNR'], data_loaded['FER'], '-o', markersize=6, linewidth=3, label=sim_filename)

        # format the plots
        new_plot.set_title(plot_title)
        new_plot.set_ylabel("Frame Error Rate")
        new_plot.set_xlabel("$E_b/N_o$ (dB)")
        new_plot.grid(linestyle='-')
        new_plot.set_yscale('log')
        new_plot.legend(loc='lower left')

    # call this for manual plotting
    def plot(self, sim_filenames, dir):
        """
        Plot multiple sets of FER data from the same directory on the same axes.

        Parameters
        ----------
        sim_filenames: ndarray<string>
            a list of all filenames to plot in a common root directory
        dir: string
            the root directory for the specified filenames

        """

        fig = plt.figure()
        new_plot = fig.add_subplot(111)
        self.plot_helper(new_plot, sim_filenames, dir)
        fig.show()

    # used by the GUI class for automated plotting
    def gui_plot_handler(self, gui_dict, fig):
        sim_filenames = gui_dict['filenames']
        dir = gui_dict['file_dir']
        self.plot_helper(fig, sim_filenames, dir)

    # used by the GUI class for simulating a new code
    def gui_sim_handler(self, gui_dict):
        # updated Polar Code from user
        punct_type = 'shorten' if gui_dict['punct_type'] == True else 'punct'
        shortening_params = (punct_type, gui_dict['punct_algo'], np.array(gui_dict['shortened_set'], dtype=int),
                             np.array(gui_dict['shortened_set'], dtype=int), False)
        self.initialise_code(gui_dict['N'], gui_dict['K'], shortening_params)
        self.construction_type = gui_dict['construction_algo']
        self.frozen = gui_dict['frozen_set']

        # simulation parameters from user
        iterations = gui_dict['iterations']
        min_frame_errors = gui_dict['min_frame_errors']
        file_dir = gui_dict['file_dir']
        save_to = gui_dict['save_to']
        manual_const_flag = gui_dict['manual_const_flag']
        design_SNR = gui_dict['design_SNR']
        Eb_No_vec = gui_dict['snr_values']

        # run simulation in another thread to avoid GUI freeze
        th = threading.Thread(name='sim_thread', target=self.simulate, args=(save_to, Eb_No_vec, design_SNR, iterations, 1000, min_frame_errors, 1729, manual_const_flag,))
        th.setDaemon(True)
        th.start()

Classes

class PolarCode (M, K, punct_params=('', '', [], [], None))

Attributes

N : int
the mothercode block length
M : int
the block length (after puncturing)
K : int
the code dimension
n : int
number of bits per index
s : int
number of shortened bit-channels
reliabilities : ndarray<int>
reliability vector (least reliable to most reliable)
frozen : ndarray<int>
the frozen bit indices
frozen_lookup : ndarray<int>
lookup table for the frozen bits
x : ndarray<int>
the uncoded message with frozen bits
construction_type : string
the mothercode construction type
message_received : ndarray<int>
the decoded message received from a channel
punct_flag : bool
whether or not the code is punctured
simulated_snr : ndarray<float>
the SNR values simulated
simulated_fer : ndarray<float>
the FER values for the SNR values in simulated_snr using simulate
simulated_ber : ndarray<float>
the BER values for the SNR values in simulated_snr using simulate
punct_type : string
'punct' for puncturing, and 'shorten' for shortening
punct_set : ndarray<int>
the coded punctured indices
punct_set_lookup : ndarray<int>
lookup table for punct_set
source_set : ndarray<int>
the uncoded punctured indices
source_set_lookup : ndarray<int>
lookup table for source_set
punct_algorithm : string
the name of a puncturing algorithm. Options: {'brs', 'wls', 'bgl', 'perm'}
update_frozen_flag : bool
whether or not to update the frozen indices after puncturing
recip_flag : bool
True if punct_set equals source_set

Parameters

M : int
the block length (after puncturing)
K : int
the code dimension
punct_params : tuple
a tuple to completely specify the puncturing parameters (if required). The syntax is (punct_type, punct_algorithm, punct_set, source_set, update_frozen_flag)
Expand source code
class PolarCode:
    """
    Attributes
    ----------
    N: int
        the mothercode block length
    M: int
        the block length (after puncturing)
    K: int
        the code dimension
    n: int
        number of bits per index
    s: int
        number of shortened bit-channels
    reliabilities: ndarray<int>
        reliability vector (least reliable to most reliable)
    frozen: ndarray<int>
        the frozen bit indices
    frozen_lookup: ndarray<int>
        lookup table for the frozen bits
    x: ndarray<int>
        the uncoded message with frozen bits
    construction_type: string
        the mothercode construction type
    message_received: ndarray<int>
        the decoded message received from a channel
    punct_flag: bool
        whether or not the code is punctured
    simulated_snr: ndarray<float>
        the SNR values simulated
    simulated_fer: ndarray<float>
        the FER values for the SNR values in ``simulated_snr`` using `simulate`
    simulated_ber: ndarray<float>
        the BER values for the SNR values in ``simulated_snr`` using `simulate`
    punct_type: string
        'punct' for puncturing, and 'shorten' for shortening
    punct_set: ndarray<int>
        the coded punctured indices
    punct_set_lookup: ndarray<int>
        lookup table for ``punct_set``
    source_set: ndarray<int>
        the uncoded punctured indices
    source_set_lookup: ndarray<int>
        lookup table for ``source_set``
    punct_algorithm: string
        the name of a puncturing algorithm. Options: {'brs', 'wls', 'bgl', 'perm'}
    update_frozen_flag: bool
        whether or not to update the frozen indices after puncturing
    recip_flag: bool
        True if ``punct_set`` equals ``source_set``

    """

    def __init__(self, M, K, punct_params=('', '', [], [], None,)):
        """
        Parameters
        ----------
        M: int
            the block length (after puncturing)
        K: int
            the code dimension
        punct_params: tuple
            a tuple to completely specify the puncturing parameters (if required).
            The syntax is (``punct_type``, ``punct_algorithm``, ``punct_set``, ``source_set``, ``update_frozen_flag``)
        """

        self.initialise_code(M, K, punct_params)
        self.status_bar = None  # set by the GUI so that the simulation progress can be tracked
        self.gui_widgets = []

    def initialise_code(self, M, K, punct_params):
        """
        Initialise the code with a set of parameters the same way as the constructor.
        Call this any time you want to change the code rate.
        """

        # mothercode parameters
        self.M = M
        self.N = int(2**(np.ceil(np.log2(M))))
        self.n = int(np.log2(self.N))
        self.F = arikan_gen(self.n)
        self.K = K
        self.s = self.N - self.M
        self.reliabilities = np.array([])
        self.frozen = np.array([])
        self.frozen_lookup = np.array([])
        self.x = np.zeros(self.N, dtype=int)
        self.u = np.zeros(self.N, dtype=int)
        self.construction_type = 'bb'
        self.message_received = np.array([])
        self.punct_flag = False if self.M == self.N else True
        self.simulated_snr = np.array([])
        self.simulated_fer = np.array([])
        self.simulated_ber = np.array([])
        self.FERestimate = 0
        self.T = None

        # puncturing parameters
        self.punct_type = punct_params[0]
        self.punct_set = np.array(punct_params[2])
        self.punct_set_lookup = self.get_lut(punct_params[2])
        self.source_set = np.array(punct_params[3])
        self.source_set_lookup = self.get_lut(punct_params[3])
        self.punct_algorithm = punct_params[1]
        self.update_frozen_flag = punct_params[4]
        self.recip_flag = np.array_equal(np.array(punct_params[2]), np.array(punct_params[3]))

    def __str__(self):
        """
        A string definition of PolarCode. This allows you to print any PolarCode object and see all of its
        relevant parameters.

        Returns
        ----------
        string
            a stringified version of PolarCode

        """

        output = '=' * 10 + " Polar Code " + '=' * 10 + '\n'
        output += "N: " + str(self.N) + '\n'
        output += "M: " + str(self.M) + '\n'
        output += "K: "+ str(self.K) + '\n'
        output += "Mothercode Construction: " + self.construction_type + '\n'
        output += "Ordered Bits (least reliable to most reliable): " + str(self.reliabilities) + '\n'
        output += "Frozen Bits: " + str(self.frozen) + '\n'
        output += "Puncturing Flag: " + str(self.punct_flag) + '\n'
        output += "Puncturing Parameters: {punct_type: " + str(self.punct_type) + '\n'
        output += "                        punct_algorithm: " + str(self.punct_algorithm) + '\n'
        output += "                        punct_set: " + str(self.punct_set) + '\n'
        output += "                        source_set: " + str(self.source_set) + '\n'
        output += "                        update_frozen_flag: " + str(self.update_frozen_flag) + "}" + '\n'
        return output

    def set_message(self, m):
        """
        Set the message vector to the non-frozen bits in ``x``. The frozen bits in ``frozen`` are set to zero.

        Parameters
        ----------
        m: ndarray<int>
            the message vector

        """

        self.message = m
        self.x[self.frozen_lookup == 1] = m
        self.u = self.x.copy()

    def get_normalised_SNR(self, design_SNR):
        """
        Normalise E_b/N_o so that the message bits have the same energy for any code rate.

        Parameters
        ----------
        design_SNR: float
            E_b/N_o in decibels

        Returns
        ----------
        float
            normalised E_b/N_o in linear units

        """

        Eb_No_dB = design_SNR
        Eb_No = 10 ** (Eb_No_dB / 10)  # convert dB scale to linear
        Eb_No = Eb_No * (self.K / self.M)  # normalised message signal energy by R=K/M (M=N if not punctured)
        return Eb_No

    def get_lut(self, my_set):
        """
        Convert a set into a lookup table.

        Parameters
        ----------
        my_set: ndarray<int>
            a vector of indices

        Returns
        ----------
        ndarray<int>
            a LUT with "0" for an index in ``my_set``, else "1"

        """

        my_lut = np.ones(self.N, dtype=int)
        my_lut[my_set] = 0
        return my_lut

    def save_as_json(self, sim_filename):
        """
        Save all the important parameters in this object as a JSON file.

        Parameters
        ----------
        sim_filename: string
            directory and filename to save JSON file to (excluding extension)

        """
        data = {
            'N': self.M,
            'n': self.n,
            'K': self.K,
            'frozen': self.frozen.tolist(),
            'construction_type': self.construction_type,
            'punct_flag': self.punct_flag,
            'punct_type': self.punct_type,
            'punct_set': self.punct_set.tolist(),
            'source_set': self.source_set.tolist(),
            'punct_algorithm': self.punct_algorithm,
            'update_frozen_flag': self.update_frozen_flag,
            'BER': self.simulated_ber.tolist(),
            'FER': self.simulated_fer.tolist(),
            'SNR': self.simulated_snr.tolist()
        }
        with open(sim_filename + '.json', 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

    def run_simulation(self, Eb_No, max_iter, min_errors, min_iters):
        frame_error_count = 0
        bit_error_count = 0
        num_blocks = 0
        for i in range(1, max_iter + 1):
            # simulate random PC in an AWGN channel
            self.set_message(np.random.randint(2, size=self.K))
            Encode(self)
            AWGN(self, Eb_No)
            Decode(self)

            # detect errors
            error_vec = self.message ^ self.message_received
            num_errors = sum(error_vec)
            frame_error_count = frame_error_count + (num_errors > 1)
            bit_error_count = bit_error_count + num_errors

            # early stopping condition
            num_blocks = i
            if frame_error_count >= min_errors and i >= min_iters:
                break
        return frame_error_count, bit_error_count, num_blocks

    def simulate(self, save_to, Eb_No_vec, design_SNR=None, max_iter=100000, min_iterations=1000, min_errors=30, sim_seed=1729, manual_const_flag=True):
        """
        Monte-carlo simulation of the performance of this polar code.
        The simulation has an early stopping condition of when the number of errors is below min_errors.
        Each E_b/N_o simulation has an additional early stopping condition using the minimum iterations
        and the minimum number of errors. The results are saved in a JSON file using :func:`save_as_json`.

        Parameters
        ----------
        save_to: string
            directory and filename to save JSON file to (excluding extension)
        Eb_No_vec: ndarray<float>
            the range of SNR values to simulate
        design_SNR: float
            the construction design SNR, E_b/N_o
        max_iter: int
            maximum number of iterations per SNR
        min_iterations: int
            the minimum number of iterations before early stopping is allowed per SNR
        min_errors: int
            the minimum number of frame errors before early stopping is allowed per SNR
        sim_seed: int
            pseudo-random generator seed, default is 1729 ('twister' on MATLAB)
        manual_const_flag: bool
            a flag that decides if construction should be done before simulating.
            Set to False if mothercode and/or puncturing constructions are manually set by the user.

        """

        # initialise simulation
        np.random.seed(sim_seed)
        frame_error_rates = np.zeros(len(Eb_No_vec))
        bit_error_rates = np.zeros(len(Eb_No_vec))

        # do construction if not done already
        if not manual_const_flag:
            if self.punct_flag and self.punct_type == 'shorten':
                Shorten(self, design_SNR)
            else:
                Construct(self, design_SNR)

        print(self)
        print('=' * 10, "Simulation", '=' * 10)
        for i in range(len(Eb_No_vec)):
            # run simulation for the current SNR
            frame_error_count, bit_error_count, num_blocks = self.run_simulation(Eb_No_vec[i], max_iter, min_errors, min_iterations)

            # calculate FER and BER
            frame_error_rate = frame_error_count / num_blocks
            bit_error_rate = bit_error_count / (self.K * num_blocks)
            frame_error_rates[i] = frame_error_rate
            bit_error_rates[i] = bit_error_rate
            print("Eb/No:", round(Eb_No_vec[i], 5), "  FER:", round(frame_error_rate, 3), "  BER:", round(bit_error_rate, 5))
            print('# Iterations:', num_blocks, '  # Frame Errors:', frame_error_count, ' # Bit Errors:', bit_error_count)
            print('='*20)

            # update GUI (if used)
            if self.status_bar != None:
                self.status_bar.set("Simulation progress: " + str(i + 1) + "/" + str(len(Eb_No_vec)))

            # early stopping condition
            if frame_error_count < min_errors:
                break

        # write data to JSON file
        self.simulated_snr = Eb_No_vec
        self.simulated_ber = bit_error_rates
        self.simulated_fer = frame_error_rates
        self.save_as_json(save_to)

        # update GUI construction fields (if used)
        if self.status_bar != None:
            self.gui_widgets[3].delete("1.0", tk.END)
            self.gui_widgets[6].delete("1.0", tk.END)
            self.gui_widgets[3].insert(tk.INSERT, ",".join(map(str, self.frozen)))
            self.gui_widgets[6].insert(tk.INSERT, ",".join(map(str, self.punct_set)))

        # update console and GUI
        print("Successfully completed simulation.\n")
        if self.status_bar != None:
            self.status_bar.set("Simulation progress: Done.")

    def plot_helper(self, new_plot, sim_filenames, dir, plot_title = 'Polar Code Performance'):
        # plot the FER and BER from file list
        new_plot.cla()
        for sim_filename in sim_filenames:
            with open(dir + sim_filename + '.json') as data_file:
                data_loaded = json.load(data_file)
            new_plot.plot(data_loaded['SNR'], data_loaded['FER'], '-o', markersize=6, linewidth=3, label=sim_filename)

        # format the plots
        new_plot.set_title(plot_title)
        new_plot.set_ylabel("Frame Error Rate")
        new_plot.set_xlabel("$E_b/N_o$ (dB)")
        new_plot.grid(linestyle='-')
        new_plot.set_yscale('log')
        new_plot.legend(loc='lower left')

    # call this for manual plotting
    def plot(self, sim_filenames, dir):
        """
        Plot multiple sets of FER data from the same directory on the same axes.

        Parameters
        ----------
        sim_filenames: ndarray<string>
            a list of all filenames to plot in a common root directory
        dir: string
            the root directory for the specified filenames

        """

        fig = plt.figure()
        new_plot = fig.add_subplot(111)
        self.plot_helper(new_plot, sim_filenames, dir)
        fig.show()

    # used by the GUI class for automated plotting
    def gui_plot_handler(self, gui_dict, fig):
        sim_filenames = gui_dict['filenames']
        dir = gui_dict['file_dir']
        self.plot_helper(fig, sim_filenames, dir)

    # used by the GUI class for simulating a new code
    def gui_sim_handler(self, gui_dict):
        # updated Polar Code from user
        punct_type = 'shorten' if gui_dict['punct_type'] == True else 'punct'
        shortening_params = (punct_type, gui_dict['punct_algo'], np.array(gui_dict['shortened_set'], dtype=int),
                             np.array(gui_dict['shortened_set'], dtype=int), False)
        self.initialise_code(gui_dict['N'], gui_dict['K'], shortening_params)
        self.construction_type = gui_dict['construction_algo']
        self.frozen = gui_dict['frozen_set']

        # simulation parameters from user
        iterations = gui_dict['iterations']
        min_frame_errors = gui_dict['min_frame_errors']
        file_dir = gui_dict['file_dir']
        save_to = gui_dict['save_to']
        manual_const_flag = gui_dict['manual_const_flag']
        design_SNR = gui_dict['design_SNR']
        Eb_No_vec = gui_dict['snr_values']

        # run simulation in another thread to avoid GUI freeze
        th = threading.Thread(name='sim_thread', target=self.simulate, args=(save_to, Eb_No_vec, design_SNR, iterations, 1000, min_frame_errors, 1729, manual_const_flag,))
        th.setDaemon(True)
        th.start()

Methods

def get_lut(self, my_set)

Convert a set into a lookup table.

Parameters

my_set : ndarray<int>
a vector of indices

Returns

ndarray<int>
a LUT with "0" for an index in my_set, else "1"
Expand source code
def get_lut(self, my_set):
    """
    Convert a set into a lookup table.

    Parameters
    ----------
    my_set: ndarray<int>
        a vector of indices

    Returns
    ----------
    ndarray<int>
        a LUT with "0" for an index in ``my_set``, else "1"

    """

    my_lut = np.ones(self.N, dtype=int)
    my_lut[my_set] = 0
    return my_lut
def get_normalised_SNR(self, design_SNR)

Normalise E_b/N_o so that the message bits have the same energy for any code rate.

Parameters

design_SNR : float
E_b/N_o in decibels

Returns

float
normalised E_b/N_o in linear units
Expand source code
def get_normalised_SNR(self, design_SNR):
    """
    Normalise E_b/N_o so that the message bits have the same energy for any code rate.

    Parameters
    ----------
    design_SNR: float
        E_b/N_o in decibels

    Returns
    ----------
    float
        normalised E_b/N_o in linear units

    """

    Eb_No_dB = design_SNR
    Eb_No = 10 ** (Eb_No_dB / 10)  # convert dB scale to linear
    Eb_No = Eb_No * (self.K / self.M)  # normalised message signal energy by R=K/M (M=N if not punctured)
    return Eb_No
def gui_plot_handler(self, gui_dict, fig)
Expand source code
def gui_plot_handler(self, gui_dict, fig):
    sim_filenames = gui_dict['filenames']
    dir = gui_dict['file_dir']
    self.plot_helper(fig, sim_filenames, dir)
def gui_sim_handler(self, gui_dict)
Expand source code
def gui_sim_handler(self, gui_dict):
    # updated Polar Code from user
    punct_type = 'shorten' if gui_dict['punct_type'] == True else 'punct'
    shortening_params = (punct_type, gui_dict['punct_algo'], np.array(gui_dict['shortened_set'], dtype=int),
                         np.array(gui_dict['shortened_set'], dtype=int), False)
    self.initialise_code(gui_dict['N'], gui_dict['K'], shortening_params)
    self.construction_type = gui_dict['construction_algo']
    self.frozen = gui_dict['frozen_set']

    # simulation parameters from user
    iterations = gui_dict['iterations']
    min_frame_errors = gui_dict['min_frame_errors']
    file_dir = gui_dict['file_dir']
    save_to = gui_dict['save_to']
    manual_const_flag = gui_dict['manual_const_flag']
    design_SNR = gui_dict['design_SNR']
    Eb_No_vec = gui_dict['snr_values']

    # run simulation in another thread to avoid GUI freeze
    th = threading.Thread(name='sim_thread', target=self.simulate, args=(save_to, Eb_No_vec, design_SNR, iterations, 1000, min_frame_errors, 1729, manual_const_flag,))
    th.setDaemon(True)
    th.start()
def initialise_code(self, M, K, punct_params)

Initialise the code with a set of parameters the same way as the constructor. Call this any time you want to change the code rate.

Expand source code
def initialise_code(self, M, K, punct_params):
    """
    Initialise the code with a set of parameters the same way as the constructor.
    Call this any time you want to change the code rate.
    """

    # mothercode parameters
    self.M = M
    self.N = int(2**(np.ceil(np.log2(M))))
    self.n = int(np.log2(self.N))
    self.F = arikan_gen(self.n)
    self.K = K
    self.s = self.N - self.M
    self.reliabilities = np.array([])
    self.frozen = np.array([])
    self.frozen_lookup = np.array([])
    self.x = np.zeros(self.N, dtype=int)
    self.u = np.zeros(self.N, dtype=int)
    self.construction_type = 'bb'
    self.message_received = np.array([])
    self.punct_flag = False if self.M == self.N else True
    self.simulated_snr = np.array([])
    self.simulated_fer = np.array([])
    self.simulated_ber = np.array([])
    self.FERestimate = 0
    self.T = None

    # puncturing parameters
    self.punct_type = punct_params[0]
    self.punct_set = np.array(punct_params[2])
    self.punct_set_lookup = self.get_lut(punct_params[2])
    self.source_set = np.array(punct_params[3])
    self.source_set_lookup = self.get_lut(punct_params[3])
    self.punct_algorithm = punct_params[1]
    self.update_frozen_flag = punct_params[4]
    self.recip_flag = np.array_equal(np.array(punct_params[2]), np.array(punct_params[3]))
def plot(self, sim_filenames, dir)

Plot multiple sets of FER data from the same directory on the same axes.

Parameters

sim_filenames : ndarray<string>
a list of all filenames to plot in a common root directory
dir : string
the root directory for the specified filenames
Expand source code
def plot(self, sim_filenames, dir):
    """
    Plot multiple sets of FER data from the same directory on the same axes.

    Parameters
    ----------
    sim_filenames: ndarray<string>
        a list of all filenames to plot in a common root directory
    dir: string
        the root directory for the specified filenames

    """

    fig = plt.figure()
    new_plot = fig.add_subplot(111)
    self.plot_helper(new_plot, sim_filenames, dir)
    fig.show()
def plot_helper(self, new_plot, sim_filenames, dir, plot_title='Polar Code Performance')
Expand source code
def plot_helper(self, new_plot, sim_filenames, dir, plot_title = 'Polar Code Performance'):
    # plot the FER and BER from file list
    new_plot.cla()
    for sim_filename in sim_filenames:
        with open(dir + sim_filename + '.json') as data_file:
            data_loaded = json.load(data_file)
        new_plot.plot(data_loaded['SNR'], data_loaded['FER'], '-o', markersize=6, linewidth=3, label=sim_filename)

    # format the plots
    new_plot.set_title(plot_title)
    new_plot.set_ylabel("Frame Error Rate")
    new_plot.set_xlabel("$E_b/N_o$ (dB)")
    new_plot.grid(linestyle='-')
    new_plot.set_yscale('log')
    new_plot.legend(loc='lower left')
def run_simulation(self, Eb_No, max_iter, min_errors, min_iters)
Expand source code
def run_simulation(self, Eb_No, max_iter, min_errors, min_iters):
    frame_error_count = 0
    bit_error_count = 0
    num_blocks = 0
    for i in range(1, max_iter + 1):
        # simulate random PC in an AWGN channel
        self.set_message(np.random.randint(2, size=self.K))
        Encode(self)
        AWGN(self, Eb_No)
        Decode(self)

        # detect errors
        error_vec = self.message ^ self.message_received
        num_errors = sum(error_vec)
        frame_error_count = frame_error_count + (num_errors > 1)
        bit_error_count = bit_error_count + num_errors

        # early stopping condition
        num_blocks = i
        if frame_error_count >= min_errors and i >= min_iters:
            break
    return frame_error_count, bit_error_count, num_blocks
def save_as_json(self, sim_filename)

Save all the important parameters in this object as a JSON file.

Parameters

sim_filename : string
directory and filename to save JSON file to (excluding extension)
Expand source code
def save_as_json(self, sim_filename):
    """
    Save all the important parameters in this object as a JSON file.

    Parameters
    ----------
    sim_filename: string
        directory and filename to save JSON file to (excluding extension)

    """
    data = {
        'N': self.M,
        'n': self.n,
        'K': self.K,
        'frozen': self.frozen.tolist(),
        'construction_type': self.construction_type,
        'punct_flag': self.punct_flag,
        'punct_type': self.punct_type,
        'punct_set': self.punct_set.tolist(),
        'source_set': self.source_set.tolist(),
        'punct_algorithm': self.punct_algorithm,
        'update_frozen_flag': self.update_frozen_flag,
        'BER': self.simulated_ber.tolist(),
        'FER': self.simulated_fer.tolist(),
        'SNR': self.simulated_snr.tolist()
    }
    with open(sim_filename + '.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)
def set_message(self, m)

Set the message vector to the non-frozen bits in x. The frozen bits in frozen are set to zero.

Parameters

m : ndarray<int>
the message vector
Expand source code
def set_message(self, m):
    """
    Set the message vector to the non-frozen bits in ``x``. The frozen bits in ``frozen`` are set to zero.

    Parameters
    ----------
    m: ndarray<int>
        the message vector

    """

    self.message = m
    self.x[self.frozen_lookup == 1] = m
    self.u = self.x.copy()
def simulate(self, save_to, Eb_No_vec, design_SNR=None, max_iter=100000, min_iterations=1000, min_errors=30, sim_seed=1729, manual_const_flag=True)

Monte-carlo simulation of the performance of this polar code. The simulation has an early stopping condition of when the number of errors is below min_errors. Each E_b/N_o simulation has an additional early stopping condition using the minimum iterations and the minimum number of errors. The results are saved in a JSON file using :func:save_as_json.

Parameters

save_to : string
directory and filename to save JSON file to (excluding extension)
Eb_No_vec : ndarray<float>
the range of SNR values to simulate
design_SNR : float
the construction design SNR, E_b/N_o
max_iter : int
maximum number of iterations per SNR
min_iterations : int
the minimum number of iterations before early stopping is allowed per SNR
min_errors : int
the minimum number of frame errors before early stopping is allowed per SNR
sim_seed : int
pseudo-random generator seed, default is 1729 ('twister' on MATLAB)
manual_const_flag : bool
a flag that decides if construction should be done before simulating. Set to False if mothercode and/or puncturing constructions are manually set by the user.
Expand source code
def simulate(self, save_to, Eb_No_vec, design_SNR=None, max_iter=100000, min_iterations=1000, min_errors=30, sim_seed=1729, manual_const_flag=True):
    """
    Monte-carlo simulation of the performance of this polar code.
    The simulation has an early stopping condition of when the number of errors is below min_errors.
    Each E_b/N_o simulation has an additional early stopping condition using the minimum iterations
    and the minimum number of errors. The results are saved in a JSON file using :func:`save_as_json`.

    Parameters
    ----------
    save_to: string
        directory and filename to save JSON file to (excluding extension)
    Eb_No_vec: ndarray<float>
        the range of SNR values to simulate
    design_SNR: float
        the construction design SNR, E_b/N_o
    max_iter: int
        maximum number of iterations per SNR
    min_iterations: int
        the minimum number of iterations before early stopping is allowed per SNR
    min_errors: int
        the minimum number of frame errors before early stopping is allowed per SNR
    sim_seed: int
        pseudo-random generator seed, default is 1729 ('twister' on MATLAB)
    manual_const_flag: bool
        a flag that decides if construction should be done before simulating.
        Set to False if mothercode and/or puncturing constructions are manually set by the user.

    """

    # initialise simulation
    np.random.seed(sim_seed)
    frame_error_rates = np.zeros(len(Eb_No_vec))
    bit_error_rates = np.zeros(len(Eb_No_vec))

    # do construction if not done already
    if not manual_const_flag:
        if self.punct_flag and self.punct_type == 'shorten':
            Shorten(self, design_SNR)
        else:
            Construct(self, design_SNR)

    print(self)
    print('=' * 10, "Simulation", '=' * 10)
    for i in range(len(Eb_No_vec)):
        # run simulation for the current SNR
        frame_error_count, bit_error_count, num_blocks = self.run_simulation(Eb_No_vec[i], max_iter, min_errors, min_iterations)

        # calculate FER and BER
        frame_error_rate = frame_error_count / num_blocks
        bit_error_rate = bit_error_count / (self.K * num_blocks)
        frame_error_rates[i] = frame_error_rate
        bit_error_rates[i] = bit_error_rate
        print("Eb/No:", round(Eb_No_vec[i], 5), "  FER:", round(frame_error_rate, 3), "  BER:", round(bit_error_rate, 5))
        print('# Iterations:', num_blocks, '  # Frame Errors:', frame_error_count, ' # Bit Errors:', bit_error_count)
        print('='*20)

        # update GUI (if used)
        if self.status_bar != None:
            self.status_bar.set("Simulation progress: " + str(i + 1) + "/" + str(len(Eb_No_vec)))

        # early stopping condition
        if frame_error_count < min_errors:
            break

    # write data to JSON file
    self.simulated_snr = Eb_No_vec
    self.simulated_ber = bit_error_rates
    self.simulated_fer = frame_error_rates
    self.save_as_json(save_to)

    # update GUI construction fields (if used)
    if self.status_bar != None:
        self.gui_widgets[3].delete("1.0", tk.END)
        self.gui_widgets[6].delete("1.0", tk.END)
        self.gui_widgets[3].insert(tk.INSERT, ",".join(map(str, self.frozen)))
        self.gui_widgets[6].insert(tk.INSERT, ",".join(map(str, self.punct_set)))

    # update console and GUI
    print("Successfully completed simulation.\n")
    if self.status_bar != None:
        self.status_bar.set("Simulation progress: Done.")