Source code for setigen.voltage.quantization

import os

GPU_FLAG = os.getenv('SETIGEN_ENABLE_GPU', '0')
if GPU_FLAG == '1':
    try:
        import cupy as xp
    except ImportError:
        import numpy as xp
else:
    import numpy as xp
    
import numpy as np
import scipy.signal
import time

from . import data_stream


[docs]class RealQuantizer(object): """ Implement a quantizer for input voltages. """
[docs] def __init__(self, target_fwhm=32, num_bits=8, stats_calc_period=1, stats_calc_num_samples=10000): """ Initialize a quantizer, which maps real input voltages to integers between -2**(num_bits - 1) and 2**(num_bits - 1) - 1, inclusive. Specifically, it estimates the mean and standard deviation of the voltages, and maps to 0 mean and a target full width at half maximum (FWHM). Voltages that extend past the quantized voltage range are clipped accordingly. The mean and standard deviation calculations can be limited to save computation using the `stats_calc_period` and `stats_calc_num_samples` parameters. The former is an integer that specifies the period of computation; if 1, it computes the stats every time. If set to a non-positive integer, like -1, the computation will run once during the first call and never again. The latter specifies the maximum number of voltage samples to use in calculating the statistics; depending on the nature of the input voltages, a relatively small number of samples may be sufficient for capturing the general distribution of voltages. Parameters ---------- target_fwhm : float, optional Target FWHM num_bits : int, optional Number of bits to quantize to. Quantized voltages will span -2**(num_bits - 1) to 2**(num_bits - 1) - 1, inclusive. stats_calc_period : int, optional Sets the period for computing the mean and standard deviation of input voltages stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise statistics """ self.target_fwhm = target_fwhm self.target_sigma = self.target_fwhm / (2 * xp.sqrt(2 * xp.log(2))) self.num_bits = num_bits self.stats_cache = [[[None, None], [None, None]]] # shape (num_antennas, num_pols, 2) self.stats_calc_indices = [[0, 0]] # shape (num_antennas, num_pols) self.stats_calc_period = stats_calc_period self.stats_calc_num_samples = stats_calc_num_samples
[docs] def quantize(self, voltages, pol=0, antenna=0): """ Quantize input voltages. Cache voltage mean and standard deviation, per polarization and per antenna. Parameters ---------- voltages : array Array of real voltages pol : int, optional Index specifying the polarization to which the quantization is applied, for x and y polarizations. antenna : int, optional Index specifying the antenna to which the quantization is applied. Default is 0, which works for single Antenna cases. Returns ------- q_voltages : array Array of quantized voltages """ if self.stats_calc_indices[antenna][pol] == 0: self.stats_cache[antenna][pol] = data_stream.estimate_stats(voltages, self.stats_calc_num_samples) q_voltages = quantize_real(voltages, target_fwhm=self.target_fwhm, num_bits=self.num_bits, data_mean=self.stats_cache[antenna][pol][0], data_sigma=self.stats_cache[antenna][pol][1], stats_calc_num_samples=self.stats_calc_num_samples) self.stats_calc_indices[antenna][pol] += 1 if self.stats_calc_indices[antenna][pol] == self.stats_calc_period: self.stats_calc_indices[antenna][pol] = 0 return q_voltages
[docs] def digitize(self, voltages, pol=0, antenna=0): """ Quantize input voltages. Wrapper for :code:`quantize()`. """ return self.quantize(voltages, pol=pol, antenna=antenna)
[docs]class ComplexQuantizer(object): """ Implement a quantizer for complex voltages, using a pair of RealQuantizers. """
[docs] def __init__(self, target_fwhm=32, num_bits=8, stats_calc_period=1, stats_calc_num_samples=10000): """ Initialize a complex quantizer, which maps complex input voltage components to integers between -2**(num_bits - 1) and 2**(num_bits - 1) - 1, inclusive. Uses a pair of RealQuantizers to quantize real and imaginary components separately. Parameters ---------- target_fwhm : float, optional Target FWHM num_bits : int, optional Number of bits to quantize to. Quantized voltages will span -2**(num_bits - 1) to 2**(num_bits - 1) - 1, inclusive. stats_calc_period : int, optional Sets the period for computing the mean and standard deviation of input voltages stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise statistics """ self.target_fwhm = target_fwhm self.target_sigma = self.target_fwhm / (2 * xp.sqrt(2 * xp.log(2))) self.num_bits = num_bits self.stats_cache_r = [[[None, None], [None, None]]] # shape (num_antennas, num_pols, 2) self.stats_cache_i = [[[None, None], [None, None]]] # shape (num_antennas, num_pols, 2) self.stats_calc_period = stats_calc_period self.stats_calc_num_samples = stats_calc_num_samples self.quantizer_r = RealQuantizer(target_fwhm=target_fwhm, num_bits=num_bits, stats_calc_period=stats_calc_period, stats_calc_num_samples=stats_calc_num_samples) self.quantizer_i = RealQuantizer(target_fwhm=target_fwhm, num_bits=num_bits, stats_calc_period=stats_calc_period, stats_calc_num_samples=stats_calc_num_samples)
[docs] def quantize(self, voltages, pol=0, antenna=0): """ Quantize input complex voltages. Cache voltage means and standard deviations, per polarization and per antenna. Parameters ---------- voltages : array Array of complex voltages pol : int, optional Index specifying the polarization to which the quantization is applied, for x and y polarizations. antenna : int, optional Index specifying the antenna to which the quantization is applied. Default is 0, which works for single Antenna cases. Returns ------- q_voltages : array Array of complex quantized voltages """ q_r = self.quantizer_r.quantize(xp.real(voltages), pol=pol, antenna=antenna) q_i = self.quantizer_i.quantize(xp.imag(voltages), pol=pol, antenna=antenna) self.stats_cache_r = self.quantizer_r.stats_cache self.stats_cache_i = self.quantizer_i.stats_cache return q_r + q_i * 1j
[docs]def quantize_real(x, target_fwhm=32, num_bits=8, data_mean=None, data_sigma=None, stats_calc_num_samples=10000): """ Quantize real voltage data to integers with specified number of bits and target FWHM range. Parameters ---------- x : array Array of voltages target_fwhm : float, optional Target FWHM num_bits : int, optional Number of bits to quantize to. Quantized voltages will span -2**(num_bits - 1) to 2**(num_bits - 1) - 1, inclusive. data_mean : float, optional Mean of input voltages, if already known data_sigma : float, optional Standard deviation of input voltages, if already known. If None, estimates mean and standard deviation automatically. stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise statistics Returns ------- q_voltages : array Array of quantized voltages """ if data_sigma is None: data_mean, data_sigma = data_stream.estimate_stats(x, stats_calc_num_samples) data_fwhm = 2 * xp.sqrt(2 * xp.log(2)) * data_sigma factor = target_fwhm / data_fwhm q_voltages = xp.around(factor * (x - data_mean)) q_voltages = xp.clip(q_voltages, -2**(num_bits - 1), 2**(num_bits - 1) - 1) q_voltages = q_voltages.astype(int) return q_voltages
[docs]def quantize_complex(x, target_fwhm=32, num_bits=8, stats_calc_num_samples=10000): """ Quantize complex voltage data to integers with specified number of bits and target FWHM range. Parameters ---------- x : array Array of complex voltages target_fwhm : float, optional Target FWHM num_bits : int, optional Number of bits to quantize to. Quantized voltages will span -2**(num_bits - 1) to 2**(num_bits - 1) - 1, inclusive. stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise statistics Returns ------- q_c : array Array of complex quantized voltages """ r, i = xp.real(x), xp.imag(x) q_r = quantize_real(r, target_fwhm=target_fwhm, num_bits=num_bits, stats_calc_num_samples=stats_calc_num_samples) q_i = quantize_real(i, target_fwhm=target_fwhm, num_bits=num_bits, stats_calc_num_samples=stats_calc_num_samples) q_c = q_r + q_i * 1j return q_c