Source code for setigen.voltage.backend

import sys
import os

GPU_FLAG = os.getenv('SETIGEN_ENABLE_GPU', '0')
if GPU_FLAG == '1':
    try:
        import cupy as xp
    except ImportError:
        import numpy as xp
else:
    import numpy as xp
import numpy as np

from tqdm import tqdm
import time
import copy
import glob
from setigen import unit_utils
from setigen.voltage import raw_utils
from setigen.voltage import polyphase_filterbank
from setigen.voltage import quantization
from setigen.voltage import antenna as v_antenna


[docs] class RawVoltageBackend(object): """ Central class that wraps around antenna sources and backend elements to facilitate the creation of GUPPI RAW voltage files from synthetic real voltages. """
[docs] def __init__(self, antenna_source, digitizer, filterbank, requantizer, start_chan=0, num_chans=64, block_size=134217728, blocks_per_file=128, num_subblocks=32): """ Initialize a RawVoltageBackend object, with an input antenna source (either Antenna or MultiAntennaArray), and backend elements (digitizer, filterbank, requantizer). Also, details behind the RAW file format and recording are specified on initialization, such as which coarse channels are saved and the size of recording blocks. Parameters ---------- antenna_source : Antenna or MultiAntennaArray Antenna or MultiAntennaArray, from which real voltage data is created digitizer : RealQuantizer or ComplexQuantizer, or list Quantizer used to digitize input voltages. Either a single object to be used as a template for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols). filterbank : PolyphaseFilterbank, or list Polyphase filterbank object used to channelize voltages. Either a single object to be used as a template for each antenna and polarization, or a 2D list of filterbank objects of shape (num_antennas, num_pols). requantizer : ComplexQuantizer, or list Quantizer used on complex channelized voltages. Either a single object to be used as a template for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols). start_chan : int, optional Index of first coarse channel to be recorded num_chans : int, optional Number of coarse channels to be recorded block_size : int, optional Recording block size, in bytes blocks_per_file : int, optional Number of blocks to be saved per RAW file num_subblocks : int, optional Number of partitions per block, used for computation. If ``num_subblocks=1``, one block's worth of data will be passed through the pipeline and recorded at once. Use this parameter to reduce memory load, especially when using GPU acceleration. """ self.antenna_source = antenna_source if isinstance(antenna_source, v_antenna.Antenna): self.num_antennas = 1 self.is_antenna_array = False elif isinstance(antenna_source, v_antenna.MultiAntennaArray): self.num_antennas = self.antenna_source.num_antennas self.is_antenna_array = True else: raise ValueError("Invalid type provided for 'antenna_source'.") self.sample_rate = self.antenna_source.sample_rate self.num_pols = self.antenna_source.num_pols self.fch1 = self.antenna_source.fch1 self.ascending = self.antenna_source.ascending self.start_chan = start_chan self.num_chans = num_chans self.block_size = block_size self.blocks_per_file = blocks_per_file self.num_subblocks = num_subblocks self.digitizer = digitizer if isinstance(self.digitizer, quantization.RealQuantizer) or isinstance(self.digitizer, quantization.ComplexQuantizer): self.digitizer = [[copy.deepcopy(self.digitizer) for pol in range(self.num_pols)] for antenna in range(self.num_antennas)] elif isinstance(self.digitizer, list): assert len(self.digitizer) == self.num_antennas assert len(self.digitizer[0]) == self.num_pols for antenna in range(self.num_antennas): for pol in range(self.num_pols): assert isinstance(self.digitizer[antenna][pol], (quantization.RealQuantizer, quantization.ComplexQuantizer)) else: raise TypeError('Digitizer is incorrect type!') self.filterbank = filterbank if isinstance(self.filterbank, polyphase_filterbank.PolyphaseFilterbank): self.filterbank = [[copy.deepcopy(self.filterbank) for pol in range(self.num_pols)] for antenna in range(self.num_antennas)] elif isinstance(self.filterbank, list): assert len(self.filterbank) == self.num_antennas assert len(self.filterbank[0]) == self.num_pols for antenna in range(self.num_antennas): for pol in range(self.num_pols): assert isinstance(self.filterbank[antenna][pol], polyphase_filterbank.PolyphaseFilterbank) else: raise TypeError('Filterbank is incorrect type!') self.num_taps = self.filterbank[0][0].num_taps self.num_branches = self.filterbank[0][0].num_branches assert self.start_chan + self.num_chans <= self.num_branches // 2 self.tbin = self.num_branches / self.sample_rate self.chan_bw = 1 / self.tbin if not self.ascending: self.chan_bw = -self.chan_bw self.requantizer = requantizer if isinstance(self.requantizer, quantization.ComplexQuantizer): self.requantizer = [[copy.deepcopy(self.requantizer) for pol in range(self.num_pols)] for antenna in range(self.num_antennas)] elif isinstance(self.requantizer, list): assert len(self.requantizer) == self.num_antennas assert len(self.requantizer[0]) == self.num_pols for antenna in range(self.num_antennas): for pol in range(self.num_pols): assert isinstance(self.requantizer[antenna][pol], quantization.ComplexQuantizer) else: raise TypeError('Requantizer is incorrect type!') self.num_bits = self.requantizer[0][0].num_bits self.num_bytes = self.num_bits // 8 self.bytes_per_sample = 2 * self.num_pols * self.num_bits // 8 self.total_obs_num_samples = None # Make sure that block_size is appropriate assert self.block_size % int(self.num_antennas * self.num_chans * self.num_taps * self.bytes_per_sample) == 0 self.samples_per_block = self.block_size // (self.num_antennas * self.num_chans * self.bytes_per_sample) self.time_per_block = self.samples_per_block * self.tbin self.sample_stage_t = 0 self.digitizer_stage_t = 0 self.filterbank_stage_t = 0 self.requantizer_stage_t = 0 self.input_file_stem = None # Filename stem for input RAW data self.input_header_dict = None # RAW header self.header_size = None # Size of header in file (bytes), including padding self.input_num_blocks = None # Total number of blocks in supplied input RAW data self.input_file_handler = None # Current file handler for input RAW data
[docs] @classmethod def from_data(cls, input_file_stem, antenna_source, digitizer, filterbank, start_chan=0, num_subblocks=32): """ Initialize a RawVoltageBackend object, using existing RAW data as a background for signal insertion and recording. Compared to normal initialization, some parameters are inferred from the input data. Parameters ---------- input_file_stem : str Filename or path stem to input RAW data antenna_source : Antenna or MultiAntennaArray Antenna or MultiAntennaArray, from which real voltage data is created digitizer : RealQuantizer or ComplexQuantizer, or list Quantizer used to digitize input voltages. Either a single object to be used as a template for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols). filterbank : PolyphaseFilterbank, or list Polyphase filterbank object used to channelize voltages. Either a single object to be used as a template for each antenna and polarization, or a 2D list of filterbank objects of shape (num_antennas, num_pols). start_chan : int, optional Index of first coarse channel to be recorded num_subblocks : int, optional Number of partitions per block, used for computation. If ``num_subblocks=1``, one block's worth of data will be passed through the pipeline and recorded at once. Use this parameter to reduce memory load, especially when using GPU acceleration. Returns ------- backend : RawVoltageBackend Created backend object """ requantizer = quantization.ComplexQuantizer() raw_params = raw_utils.get_raw_params(input_file_stem=input_file_stem, start_chan=start_chan) blocks_per_file = raw_utils.get_blocks_per_file(input_file_stem) requantizer.num_bits = raw_params['num_bits'] requantizer.quantizer_r.num_bits = raw_params['num_bits'] requantizer.quantizer_i.num_bits = raw_params['num_bits'] # if isinstance(requantizer, quantization.ComplexQuantizer): # requantizer.num_bits = raw_params['num_bits'] # requantizer.quantizer_r.num_bits = raw_params['num_bits'] # requantizer.quantizer_i.num_bits = raw_params['num_bits'] # elif isinstance(requantizer, list): # for r in requantizer: # r.num_bits = raw_params['num_bits'] # r.quantizer_r.num_bits = raw_params['num_bits'] # r.quantizer_i.num_bits = raw_params['num_bits'] backend = cls(antenna_source, digitizer=digitizer, filterbank=filterbank, requantizer=requantizer, start_chan=start_chan, num_chans=raw_params['num_chans'], block_size=raw_params['block_size'], blocks_per_file=blocks_per_file, num_subblocks=num_subblocks) backend.input_file_stem = input_file_stem backend.input_num_blocks = raw_utils.get_total_blocks(input_file_stem) backend.input_header_dict = raw_utils.read_header(f'{input_file_stem}.0000.raw') if int(backend.input_header_dict.get("DIRECTIO", 0)) == 0: backend.header_size = 80 * (len(backend.input_header_dict) + 1) else: backend.header_size = int(512 * np.ceil((80 * (len(backend.input_header_dict) + 1)) / 512)) return backend
def _header_populate_configuration(self, header_dict={}): """ Populate the given dictionary with entries showing the configuration values. Parameters ---------- header_dict : dict, optional Dictionary of header values to set. """ # Set header values determined by pipeline parameters if 'TELESCOP' not in header_dict: header_dict['TELESCOP'] = 'SETIGEN' elif self.input_header_dict is not None and 'SETIGEN' not in self.input_header_dict['TELESCOP']: header_dict['TELESCOP'] = f"{self.input_header_dict['TELESCOP'].strip()}_SETIGEN" if 'OBSERVER' not in header_dict: header_dict['OBSERVER'] = 'SETIGEN' elif self.input_header_dict is not None and 'SETIGEN' not in self.input_header_dict['OBSERVER']: header_dict['OBSERVER'] = f"{self.input_header_dict['OBSERVER'].strip()}_SETIGEN" if 'SRC_NAME' not in header_dict: header_dict['SRC_NAME'] = 'SYNTHETIC' elif self.input_header_dict is not None and 'SYNTHETIC' not in self.input_header_dict['SRC_NAME']: header_dict['SRC_NAME'] = f"{self.input_header_dict['SRC_NAME'].strip()}_SETIGEN" # Should not be able to manually change these header values header_dict['NBITS'] = self.num_bits header_dict['CHAN_BW'] = self.chan_bw * 1e-6 header_dict['NPOL'] = self.num_pols header_dict['BLOCSIZE'] = self.block_size header_dict['SCANLEN'] = self.obs_length header_dict['TBIN'] = self.tbin if self.is_antenna_array: header_dict['NANTS'] = self.num_antennas header_dict['OBSNCHAN'] = self.num_chans * self.num_antennas header_dict['OBSBW'] = self.chan_bw * self.num_chans * 1e-6 # Compute center frequency of recorded data # self.chan_bw was already adjusted for ascending or descending frequencies center_freq = (self.start_chan + (self.num_chans - 1) / 2) * self.chan_bw center_freq += self.fch1 header_dict['OBSFREQ'] = center_freq * 1e-6 if 'PKTIDX' not in header_dict: header_dict['PKTIDX'] = 0 header_dict['PKTIDX'] = int(header_dict['PKTIDX']) if 'PKTSTART' not in header_dict: header_dict['PKTSTART'] = header_dict['PKTIDX'] header_dict['PKTSTOP'] = int(header_dict['PKTSTART']) + self.num_blocks*self.samples_per_block return header_dict def _header_add_from_template(self, header_dict={}): """ Read all novel header lines into the given header dictionary. Parameters ---------- header_dict : dict, optional Dictionary of header values to set. """ my_path = os.path.abspath(os.path.dirname(__file__)) path = os.path.join(my_path, 'header_template.txt') with open(path, 'r') as t: for line in t.readlines(): key = line[:8].strip() if key != 'END' and key not in header_dict: header_dict[key] = line[9:].strip() return header_dict def _header_add_from_input_header(self, header_dict={}): """ Update all novel input header entries into the given header dictionary. Parameters ---------- header_dict : dict, optional Dictionary of header values to set. """ for key, value in self.input_header_dict.items(): if key not in header_dict: header_dict[key] = value.strip() return header_dict def _make_header(self, f, header_dict): """ Write all header lines out to file as bytes. Also increments the 'PKTIDX' key-value. Parameters ---------- f : file handle File handle of open RAW file header_dict : dict Dictionary of header values to set. """ directio = False # preprocess DIRECTIO if 'DIRECTIO' in header_dict: directio = header_dict['DIRECTIO'] try: if isinstance(directio, str): directio = int(directio.replace("'", "")) directio = directio != 0 except BaseException as err: tqdm(f'Could not parse DIRECTIO value `{header_dict["DIRECTIO"]}` ({repr(err)}). Replacing with `0`.') header_dict['DIRECTIO'] = 0 # Write each line with space and zero padding header_lines = 0 for key, value in header_dict.items(): value_is_encoded = (isinstance(value, str) and value[0] == "'") line = raw_utils.format_header_line(key, value, as_strings=value_is_encoded) f.write(f"{line:<80}".encode()) header_lines += 1 f.write(f"{'END':<80}".encode()) header_lines += 1 # Pad header if directio if directio: f.write(bytearray(512 - (80 * header_lines % 512))) header_dict['PKTIDX'] += self.samples_per_block def _read_next_block(self): """ Reads next block of data if input RAW files are provided, upon which synthetic data will be added. Also sets requantizer target statistics appropriately. """ _ = self.input_file_handler.read(self.header_size) data_chunk = self.input_file_handler.read(self.block_size) obsnchan = self.num_chans * self.num_antennas rawbuffer = np.frombuffer(data_chunk, dtype=np.int8).reshape((obsnchan, int(self.block_size / obsnchan))) input_voltages = np.zeros((obsnchan, int(rawbuffer.shape[1] / self.bytes_per_sample * self.num_pols)), dtype=complex) for antenna in range(self.num_antennas): for pol in range(self.num_pols): requantizer = self.requantizer[antenna][pol] c_idx = antenna * self.num_chans + np.arange(0, self.num_chans) if self.num_bits == 8: t_idx = 2 * pol + np.arange(0, rawbuffer.shape[1], 2 * self.num_pols) R = rawbuffer[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] I = rawbuffer[c_idx[:, np.newaxis], (t_idx+1)[np.newaxis, :]] elif self.num_bits == 4: t_idx = pol + np.arange(0, rawbuffer.shape[1], self.num_pols) Q = rawbuffer[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] R = Q // 16 I = Q - 16 * R I[I >= 8] -= 16 else: raise ValueError(f'{self.num_bits} bits not supported...') requantizer.quantizer_r._set_target_stats(np.mean(R), np.std(R)) requantizer.quantizer_i._set_target_stats(np.mean(I), np.std(I)) t_idx = pol + np.arange(0, input_voltages.shape[1], self.num_pols) input_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R + I * 1j return input_voltages
[docs] def collect_data_block(self, digitize=True, requantize=True, verbose=True): """ General function to actually collect data from the antenna source and return coarsely channelized complex voltages. Collects one block of data. Parameters ---------- digitize : bool, optional Whether to quantize input voltages before the PFB requantize : bool, optional Whether to quantize output complex voltages after the PFB verbose : bool, optional Control whether tqdm prints progress messages Returns ------- final_voltages : array Complex voltages formatted according to GUPPI RAW specifications; array of shape (num_chans * num_antennas, block_size / (num_chans * num_antennas)) """ obsnchan = self.num_chans * self.num_antennas final_voltages = np.empty((obsnchan, int(self.block_size / obsnchan))) if self.input_file_stem is not None: if not requantize: raise ValueError("Must set 'requantize=True' when using input RAW data!") input_voltages = self._read_next_block() # Make sure that block_size is appropriate assert self.block_size % int(obsnchan * self.num_taps * self.bytes_per_sample) == 0 T = int(self.block_size / (obsnchan * self.bytes_per_sample)) W = int(xp.ceil(T / self.num_taps / self.num_subblocks)) + 1 subblock_T = self.num_taps * (W - 1) # Change self.num_subblocks if necessary self.num_subblocks = int(xp.ceil(T / subblock_T)) subblock_t_len = int(subblock_T * self.bytes_per_sample) with tqdm(total=self.num_antennas*self.num_pols*self.num_subblocks, leave=False) as pbar: pbar.set_description('Subblocks') for subblock in range(self.num_subblocks): if verbose: tqdm.write(f'Creating subblock {subblock}...') # Change num windows at the end if self.num_subblocks doesn't go in evenly if T % subblock_T != 0 and subblock == self.num_subblocks - 1: W = int((T % subblock_T) / self.num_taps) + 1 if self.antenna_source.start_obs: num_samples = self.num_branches * self.num_taps * W else: num_samples = self.num_branches * self.num_taps * (W - 1) # Calculate the real voltage samples from each antenna t = time.time() antennas_v = self.antenna_source.get_samples(num_samples) self.sample_stage_t += time.time() - t for antenna in range(self.num_antennas): if verbose and self.is_antenna_array: tqdm.write(f'Creating antenna #{antenna}...') c_idx = antenna * self.num_chans + np.arange(0, self.num_chans) for pol in range(self.num_pols): # Store indices used for numpy data i/o per polarization if T % subblock_T != 0 and subblock == self.num_subblocks - 1: # Uses smaller num windows W subblock_t_range = self.num_taps * (W - 1) * self.bytes_per_sample else: subblock_t_range = subblock_t_len t_idx = subblock * subblock_t_len + self.num_bits // 4 * pol + np.arange(0, subblock_t_range, self.num_bits // 4 * self.num_pols) # Send voltage data through the backend v = antennas_v[antenna][pol] if digitize: t = time.time() v = self.digitizer[antenna][pol].quantize(v) self.digitizer_stage_t += time.time() - t t = time.time() v = self.filterbank[antenna][pol].channelize(v, cache=True) v = v[:, self.start_chan:self.start_chan+self.num_chans] self.filterbank_stage_t += time.time() - t if requantize: t = time.time() if self.input_file_stem is not None: temp_mean_r = self.requantizer[antenna][pol].quantizer_r.target_mean self.requantizer[antenna][pol].quantizer_r.target_mean = 0 temp_mean_i = self.requantizer[antenna][pol].quantizer_i.target_mean self.requantizer[antenna][pol].quantizer_i.target_mean = 0 # Start off assuming signals are embedded in Gaussian noise with std 1 if self.filterbank[antenna][pol].channelized_stds is None: self.filterbank[antenna][pol].estimate_channelized_stds() custom_stds = self.filterbank[antenna][pol].channelized_stds # If digitizing real voltages, scale up by the appropriate factor if digitize: custom_stds *= self.digitizer[antenna][pol].target_std v = self.requantizer[antenna][pol].quantize(v, custom_stds=custom_stds) self.requantizer[antenna][pol].quantizer_r.target_mean = temp_mean_r self.requantizer[antenna][pol].quantizer_i.target_mean = temp_mean_i if self.num_bits == 8: input_v = input_voltages[c_idx[:, np.newaxis], (t_idx//2)[np.newaxis, :]] elif self.num_bits == 4: input_v = input_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] input_v = xp.array(input_v) v += input_v.T v = self.requantizer[antenna][pol].quantize(v) self.requantizer_stage_t += time.time() - t # Convert to numpy array if using cupy try: R = xp.asnumpy(xp.real(v).T) I = xp.asnumpy(xp.imag(v).T) except AttributeError: R = xp.real(v).T I = xp.imag(v).T if self.num_bits == 8 or not requantize: final_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R final_voltages[c_idx[:, np.newaxis], (t_idx+1)[np.newaxis, :]] = I elif self.num_bits == 4: # Translate 4 bit complex voltages to an 8 bit equivalent representation I[I < 0] += 16 final_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R * 16 + I else: raise ValueError(f'{self.num_bits} bits not supported...') pbar.update(1) return final_voltages
[docs] def get_num_blocks(self, obs_length): """ Calculate the number of blocks required as a function of observation length, in seconds. Note that only an integer number of blocks will be recorded, so the actual observation length may be shorter than the ``obs_length`` provided. """ return int(obs_length * abs(self.chan_bw) * self.num_antennas * self.num_chans * self.bytes_per_sample / self.block_size)
[docs] def record(self, output_file_stem, obs_length=None, num_blocks=None, length_mode='obs_length', header_dict={}, digitize=True, load_template=True, verbose=True): """ General function to actually collect data from the antenna source and return coarsely channelized complex voltages. If input data is provided, only as much data as is in the input will be generated. Parameters ---------- output_file_stem : str Filename or path stem; the suffix will be automatically appended obs_length : float, optional Length of observation in seconds, if in 'obs_length' mode num_blocks : int, optional Number of data blocks to record, if in 'num_blocks' mode length_mode : str, optional Mode for specifying length of observation, either 'obs_length' in seconds or 'num_blocks' in data blocks header_dict : dict, optional Dictionary of header values to set. Use to overwrite non-essential header values or add custom ones. digitize : bool, optional Whether to quantize input voltages before the PFB verbose : bool, optional Control whether tqdm prints progress messages load_template : bool, optional Control whether the internal header template's keys are used. """ if length_mode == 'obs_length': if obs_length is None: if self.input_num_blocks is not None: self.num_blocks = self.input_num_blocks else: raise ValueError("Value not given for 'obs_length'.") else: self.num_blocks = self.get_num_blocks(obs_length) elif length_mode == 'num_blocks': if num_blocks is None: if self.input_num_blocks is not None: self.num_blocks = self.input_num_blocks else: raise ValueError("Value not given for 'num_blocks'.") else: self.num_blocks = num_blocks else: raise ValueError("Invalid option given for 'length_mode'.") # Ensure that we don't request more blocks than possible if self.input_num_blocks is not None: self.num_blocks = min(self.num_blocks, self.input_num_blocks) self.obs_length = self.num_blocks * self.time_per_block self.total_obs_num_samples = int(self.obs_length / self.tbin) * self.num_branches if load_template: header_dict = self._header_add_from_template(header_dict) if self.input_header_dict is not None: header_dict = self._header_add_from_input_header(header_dict) # Update header with config last to honor prior entries header_dict = self._header_populate_configuration(header_dict) # Mark each antenna and data stream as the start of the observation self.antenna_source.reset_start() # Reset filterbank cache as well for antenna in range(self.num_antennas): for pol in range(self.num_pols): self.digitizer[antenna][pol]._reset_cache() self.filterbank[antenna][pol]._reset_cache() self.requantizer[antenna][pol]._reset_cache() # Collect data and record to disk num_files = int(xp.ceil(self.num_blocks / self.blocks_per_file)) with tqdm(total=self.num_blocks) as pbar: pbar.set_description('Blocks') for i in range(num_files): save_fn = f'{output_file_stem}.{i:04}.raw' # Create input raw file handler for use in collecting data if self.input_file_stem is not None: input_fn = f'{self.input_file_stem}.{i:04}.raw' self.input_file_handler = open(input_fn, 'rb') with open(save_fn, 'wb') as f: # If blocks won't fill a whole file, adjust number of blocks to write at the end if i == num_files - 1 and self.num_blocks % self.blocks_per_file != 0: blocks_to_write = self.num_blocks % self.blocks_per_file else: blocks_to_write = self.blocks_per_file for j in range(blocks_to_write): if verbose: tqdm.write(f'Creating block {j}...') self._make_header(f, header_dict) v = self.collect_data_block(digitize=digitize, requantize=True, verbose=verbose) f.write(xp.array(v, dtype=xp.int8).tobytes()) if verbose: tqdm.write(f'File {i}, block {j} recorded!') pbar.update(1) if self.input_file_stem is not None: self.input_file_handler.close()
[docs] def get_block_size(num_antennas=1, tchans_per_block=128, num_bits=8, num_pols=2, num_branches=1024, num_chans=64, fftlength=1024, int_factor=4): """ Calculate block size, given a desired number of time bins per RAW data block ``tchans_per_block``. Takes in backend parameters, including fine channelization factors. Can be used to calculate reasonable block sizes for raw voltage recording. Parameters ---------- num_antennas : int Number of antennas tchans_per_block : int Final number of time bins in fine resolution product, per data block num_bits : int Number of bits in requantized data (for saving into file). Can be 8 or 4. num_pols : int Number of polarizations recorded num_branches : int Number of branches in polyphase filterbank num_chans : int Number of coarse channels written to file fftlength : int FFT length to be used in fine channelization int_factor : int, optional Integration factor to be used in fine channelization Returns ------- block_size : int Block size, in bytes """ obsnchan = num_chans * num_antennas bytes_per_sample = 2 * num_pols * num_bits // 8 T = tchans_per_block * fftlength * int_factor block_size = T * obsnchan * bytes_per_sample return block_size
[docs] def get_total_obs_num_samples(obs_length=None, num_blocks=None, length_mode='obs_length', num_antennas=1, sample_rate=3e9, block_size=134217728, num_bits=8, num_pols=2, num_branches=1024, num_chans=64): """ Calculate number of required real voltage time samples for as given ``obs_length`` or ``num_blocks``, without directly using a ``RawVoltageBackend`` object. Parameters ---------- obs_length : float, optional Length of observation in seconds, if in 'obs_length' mode num_blocks : int, optional Number of data blocks to record, if in 'num_blocks' mode length_mode : str, optional Mode for specifying length of observation, either 'obs_length' in seconds or 'num_blocks' in data blocks num_antennas : int Number of antennas sample_rate : float Sample rate in Hz block_size : int Block size used in recording GUPPI RAW files num_bits : int Number of bits in requantized data (for saving into file). Can be 8 or 4. num_pols : int Number of polarizations recorded num_branches : int Number of branches in polyphase filterbank num_chans : int Number of coarse channels written to file Returns ------- num_samples : int Number of samples """ tbin = num_branches / sample_rate chan_bw = 1 / tbin bytes_per_sample = 2 * num_pols * num_bits / 8 if length_mode == 'obs_length': if obs_length is None: raise ValueError("Value not given for 'obs_length'.") num_blocks = int(obs_length * chan_bw * num_antennas * num_chans * bytes_per_sample / block_size) elif length_mode == 'num_blocks': if num_blocks is None: raise ValueError("Value not given for 'num_blocks'.") pass else: raise ValueError("Invalid option given for 'length_mode'.") return num_blocks * int(block_size / (num_antennas * num_chans * bytes_per_sample)) * num_branches