import sys
import os
GPU_FLAG = os.getenv('SETIGEN_ENABLE_GPU', '0')
if GPU_FLAG == '1':
try:
import cupy as xp
except ImportError:
import numpy as xp
else:
import numpy as xp
import numpy as np
from tqdm import tqdm
import time
import copy
import glob
from setigen import unit_utils
from . import raw_utils
from . import polyphase_filterbank
from . import quantization
from . import antenna as v_antenna
[docs]class RawVoltageBackend(object):
"""
Central class that wraps around antenna sources and backend elements to facilitate the
creation of GUPPI RAW voltage files from synthetic real voltages.
"""
[docs] def __init__(self,
antenna_source,
digitizer=quantization.RealQuantizer(),
filterbank=polyphase_filterbank.PolyphaseFilterbank(),
requantizer=quantization.ComplexQuantizer(),
start_chan=0,
num_chans=64,
block_size=134217728,
blocks_per_file=128,
num_subblocks=32):
"""
Initialize a RawVoltageBackend object, with an input antenna source (either Antenna or
MultiAntennaArray), and backend elements (digitizer, filterbank, requantizer). Also, details
behind the RAW file format and recording are specified on initialization, such as which
coarse channels are saved and the size of recording blocks.
Parameters
----------
antenna_source : Antenna or MultiAntennaArray
Antenna or MultiAntennaArray, from which real voltage data is created
digitizer : RealQuantizer or ComplexQuantizer, or list, optional
Quantizer used to digitize input voltages. Either a single object to be used as a template
for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols).
filterbank : PolyphaseFilterbank, or list, optional
Polyphase filterbank object used to channelize voltages. Either a single object to be used as a
template for each antenna and polarization, or a 2D list of filterbank objects of shape
(num_antennas, num_pols).
requantizer : ComplexQuantizer, or list, optional
Quantizer used on complex channelized voltages. Either a single object to be used as a template
for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols).
start_chan : int, optional
Index of first coarse channel to be recorded
num_chans : int, optional
Number of coarse channels to be recorded
block_size : int, optional
Recording block size, in bytes
blocks_per_file : int, optional
Number of blocks to be saved per RAW file
num_subblocks : int, optional
Number of partitions per block, used for computation. If `num_subblocks`=1, one block's worth
of data will be passed through the pipeline and recorded at once. Use this parameter to reduce
memory load, especially when using GPU acceleration.
"""
self.antenna_source = antenna_source
if isinstance(antenna_source, v_antenna.Antenna):
self.num_antennas = 1
self.is_antenna_array = False
elif isinstance(antenna_source, v_antenna.MultiAntennaArray):
self.num_antennas = self.antenna_source.num_antennas
self.is_antenna_array = True
else:
raise ValueError("Invalid type provided for 'antenna_source'.")
self.sample_rate = self.antenna_source.sample_rate
self.num_pols = self.antenna_source.num_pols
self.fch1 = self.antenna_source.fch1
self.ascending = self.antenna_source.ascending
self.start_chan = start_chan
self.num_chans = num_chans
self.block_size = block_size
self.blocks_per_file = blocks_per_file
self.num_subblocks = num_subblocks
self.digitizer = digitizer
if isinstance(self.digitizer, quantization.RealQuantizer) or isinstance(self.digitizer, quantization.ComplexQuantizer):
self.digitizer = [[copy.deepcopy(self.digitizer)
for pol in range(self.num_pols)]
for antenna in range(self.num_antennas)]
elif isinstance(self.digitizer, list):
assert len(self.digitizer) == self.num_antennas
assert len(self.digitizer[0]) == self.num_pols
for antenna in range(self.num_antennas):
for pol in range(self.num_pols):
digitizer = self.digitizer[antenna][pol]
assert isinstance(digitizer, quantization.RealQuantizer) or isinstance(digitizer, quantization.ComplexQuantizer)
else:
raise TypeError('Digitizer is incorrect type!')
self.filterbank = filterbank
if isinstance(self.filterbank, polyphase_filterbank.PolyphaseFilterbank):
self.filterbank = [[copy.deepcopy(self.filterbank)
for pol in range(self.num_pols)]
for antenna in range(self.num_antennas)]
elif isinstance(self.filterbank, list):
assert len(self.filterbank) == self.num_antennas
assert len(self.filterbank[0]) == self.num_pols
for antenna in range(self.num_antennas):
for pol in range(self.num_pols):
filterbank = self.filterbank[antenna][pol]
assert isinstance(self.filterbank, polyphase_filterbank.PolyphaseFilterbank)
else:
raise TypeError('Filterbank is incorrect type!')
self.num_taps = self.filterbank[0][0].num_taps
self.num_branches = self.filterbank[0][0].num_branches
assert self.start_chan + self.num_chans <= self.num_branches // 2
self.tbin = self.num_branches / self.sample_rate
self.chan_bw = 1 / self.tbin
if not self.ascending:
self.chan_bw = -self.chan_bw
self.requantizer = requantizer
if isinstance(self.requantizer, quantization.ComplexQuantizer):
self.requantizer = [[copy.deepcopy(self.requantizer)
for pol in range(self.num_pols)]
for antenna in range(self.num_antennas)]
elif isinstance(self.requantizer, list):
assert len(self.requantizer) == self.num_antennas
assert len(self.requantizer[0]) == self.num_pols
for antenna in range(self.num_antennas):
for pol in range(self.num_pols):
requantizer = self.requantizer[antenna][pol]
assert isinstance(self.requantizer, quantization.ComplexQuantizer)
else:
raise TypeError('Requantizer is incorrect type!')
self.num_bits = self.requantizer[0][0].num_bits
self.num_bytes = self.num_bits // 8
self.bytes_per_sample = 2 * self.num_pols * self.num_bits // 8
self.total_obs_num_samples = None
self.time_per_block = self.block_size / (self.num_antennas * self.num_chans * self.bytes_per_sample) * self.tbin
# Make sure that block_size is appropriate
assert self.block_size % int(self.num_antennas * self.num_chans * self.num_taps * self.bytes_per_sample) == 0
self.sample_stage_t = 0
self.digitizer_stage_t = 0
self.filterbank_stage_t = 0
self.requantizer_stage_t = 0
self.input_file_stem = None # Filename stem for input RAW data
self.input_header_dict = None # RAW header
self.header_size = None # Size of header in file (bytes)
self.input_num_blocks = None # Total number of blocks in supplied input RAW data
self.input_file_handler = None # Current file handler for input RAW data
[docs] @classmethod
def from_data(cls,
input_file_stem,
antenna_source,
digitizer=quantization.RealQuantizer(),
filterbank=polyphase_filterbank.PolyphaseFilterbank(),
# requantizer=quantization.ComplexQuantizer(),
start_chan=0,
num_subblocks=32):
"""
Initialize a RawVoltageBackend object, using existing RAW data as a background for
signal insertion and recording. Compared to normal initialization, some parameters are inferred
from the input data.
Parameters
----------
input_file_stem : str
Filename or path stem to input RAW data
antenna_source : Antenna or MultiAntennaArray
Antenna or MultiAntennaArray, from which real voltage data is created
digitizer : RealQuantizer or ComplexQuantizer, or list, optional
Quantizer used to digitize input voltages. Either a single object to be used as a template
for each antenna and polarization, or a 2D list of quantizers of shape (num_antennas, num_pols).
filterbank : PolyphaseFilterbank, or list, optional
Polyphase filterbank object used to channelize voltages. Either a single object to be used as a
template for each antenna and polarization, or a 2D list of filterbank objects of shape
(num_antennas, num_pols).
start_chan : int, optional
Index of first coarse channel to be recorded
num_subblocks : int, optional
Number of partitions per block, used for computation. If `num_subblocks`=1, one block's worth
of data will be passed through the pipeline and recorded at once. Use this parameter to reduce
memory load, especially when using GPU acceleration.
Returns
-------
backend : RawVoltageBackend
Created backend object
"""
requantizer=quantization.ComplexQuantizer()
raw_params = raw_utils.get_raw_params(input_file_stem=input_file_stem,
start_chan=start_chan)
blocks_per_file = raw_utils.get_blocks_per_file(input_file_stem)
if isinstance(requantizer, quantization.ComplexQuantizer):
requantizer.num_bits = raw_params['num_bits']
requantizer.quantizer_r.num_bits = raw_params['num_bits']
requantizer.quantizer_i.num_bits = raw_params['num_bits']
elif isinstance(requantizer, list):
for r in requantizer:
r.num_bits = raw_params['num_bits']
r.quantizer_r.num_bits = raw_params['num_bits']
r.quantizer_i.num_bits = raw_params['num_bits']
backend = cls(antenna_source,
digitizer=digitizer,
filterbank=filterbank,
requantizer=requantizer,
start_chan=start_chan,
num_chans=raw_params['num_chans'],
block_size=raw_params['block_size'],
blocks_per_file=blocks_per_file,
num_subblocks=num_subblocks)
backend.input_file_stem = input_file_stem
backend.input_num_blocks = raw_utils.get_total_blocks(input_file_stem)
backend.input_header_dict = raw_utils.read_header(f'{input_file_stem}.0000.raw')
backend.header_size = int(512 * np.ceil((80 * (len(backend.input_header_dict) + 1)) / 512))
return backend
def _make_header(self, f, header_dict={}):
"""
Write all header lines out to file as bytes.
Parameters
----------
f : file handle
File handle of open RAW file
header_dict : dict, optional
Dictionary of header values to set. Use to overwrite non-essential header values or add custom ones.
"""
my_path = os.path.abspath(os.path.dirname(__file__))
path = os.path.join(my_path, 'header_template.txt')
with open(path, 'r') as t:
# Exclude END line
template_lines = t.readlines()[:-1]
# Set header values determined by pipeline parameters
if 'TELESCOP' not in header_dict:
header_dict['TELESCOP'] = 'SETIGEN'
if self.input_header_dict is not None:
header_dict['TELESCOP'] = f"{self.input_header_dict['TELESCOP']}_SETIGEN"
if 'OBSERVER' not in header_dict:
header_dict['OBSERVER'] = 'SETIGEN'
if self.input_header_dict is not None:
header_dict['OBSERVER'] = f"{self.input_header_dict['OBSERVER']}_SETIGEN"
if 'SRC_NAME' not in header_dict:
header_dict['SRC_NAME'] = 'SYNTHETIC'
if self.input_header_dict is not None:
header_dict['SRC_NAME'] = f"{self.input_header_dict['SRC_NAME']}_SETIGEN"
# Should not be able to manually change these header values
header_dict['NBITS'] = self.num_bits
header_dict['CHAN_BW'] = self.chan_bw * 1e-6
if self.num_pols == 2:
header_dict['NPOL'] = 4
else:
header_dict['NPOL'] = self.num_pols
header_dict['BLOCSIZE'] = self.block_size
header_dict['SCANLEN'] = self.obs_length
header_dict['TBIN'] = self.tbin
if self.is_antenna_array:
header_dict['NANTS'] = self.num_antennas
header_dict['OBSNCHAN'] = self.num_chans * self.num_antennas
header_dict['OBSBW'] = self.chan_bw * self.num_chans * 1e-6
# Compute center frequency of recorded data
# self.chan_bw was already adjusted for ascending or descending frequencies
center_freq = (self.start_chan + (self.num_chans - 1) / 2) * self.chan_bw
center_freq += self.fch1
header_dict['OBSFREQ'] = center_freq * 1e-6
header_lines = []
used_keys = set()
for i in range(len(template_lines)):
key = template_lines[i][:8].strip()
used_keys.add(key)
if key in header_dict:
header_lines.append(raw_utils.format_header_line(key,
header_dict[key],
as_strings=False))
elif self.input_header_dict is not None:
# If initialized with raw files, use their header values
header_lines.append(raw_utils.format_header_line(key,
self.input_header_dict[key],
as_strings=True))
else:
# Otherwise use template header key, value pairs; cut newline character
header_lines.append(template_lines[i][:-1])
# Add new key value pairs
for key in header_dict.keys() - used_keys:
header_lines.append(raw_utils.format_header_line(key, header_dict[key]))
header_lines.append(f"{'END':<80}")
# Write each line with space and zero padding
for line in header_lines:
f.write(f"{line:<80}".encode())
f.write(bytearray(512 - (80 * len(header_lines) % 512)))
def _read_next_block(self):
"""
Reads next block of data if input RAW files are provided, upon which synthetic data will
be added. Also sets requantizer target statistics appropriately.
"""
_ = self.input_file_handler.read(self.header_size)
data_chunk = self.input_file_handler.read(self.block_size)
obsnchan = self.num_chans * self.num_antennas
rawbuffer = np.frombuffer(data_chunk, dtype=np.int8).reshape((obsnchan, int(self.block_size / obsnchan)))
input_voltages = np.zeros((obsnchan, int(rawbuffer.shape[1] / self.bytes_per_sample * self.num_pols)),
dtype=complex)
for antenna in range(self.num_antennas):
for pol in range(self.num_pols):
requantizer = self.requantizer[antenna][pol]
c_idx = antenna * self.num_chans + np.arange(0, self.num_chans)
if self.num_bits == 8:
t_idx = 2 * pol + np.arange(0, rawbuffer.shape[1], 2 * self.num_pols)
R = rawbuffer[c_idx[:, np.newaxis], t_idx[np.newaxis, :]]
I = rawbuffer[c_idx[:, np.newaxis], (t_idx+1)[np.newaxis, :]]
elif self.num_bits == 4:
t_idx = pol + np.arange(0, rawbuffer.shape[1], self.num_pols)
Q = rawbuffer[c_idx[:, np.newaxis], t_idx[np.newaxis, :]]
R = Q // 16
I = Q - 16 * R
I[I >= 8] -= 16
else:
raise ValueError(f'{self.num_bits} bits not supported...')
requantizer.quantizer_r._set_target_stats(np.mean(R), np.std(R))
requantizer.quantizer_i._set_target_stats(np.mean(I), np.std(I))
t_idx = pol + np.arange(0, input_voltages.shape[1], self.num_pols)
input_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R + I * 1j
return input_voltages
[docs] def collect_data_block(self,
digitize=True,
requantize=True,
verbose=True):
"""
General function to actually collect data from the antenna source and return coarsely channelized
complex voltages. Collects one block of data.
Parameters
----------
digitize : bool, optional
Whether to quantize input voltages before the PFB
requantize : bool, optional
Whether to quantize output complex voltages after the PFB
verbose : bool, optional
Control whether tqdm prints progress messages
Returns
-------
final_voltages : array
Complex voltages formatted according to GUPPI RAW specifications; array of shape
(num_chans * num_antennas, block_size / (num_chans * num_antennas))
"""
obsnchan = self.num_chans * self.num_antennas
final_voltages = np.empty((obsnchan, int(self.block_size / obsnchan)))
if self.input_file_stem is not None:
if not requantize:
raise ValueError("Must set 'requantize=True' when using input RAW data!")
input_voltages = self._read_next_block()
# Make sure that block_size is appropriate
assert self.block_size % int(obsnchan * self.num_taps * self.bytes_per_sample) == 0
T = int(self.block_size / (obsnchan * self.bytes_per_sample))
W = int(xp.ceil(T / self.num_taps / self.num_subblocks)) + 1
subblock_T = self.num_taps * (W - 1)
# Change self.num_subblocks if necessary
self.num_subblocks = int(xp.ceil(T / subblock_T))
subblock_t_len = int(subblock_T * self.bytes_per_sample)
with tqdm(total=self.num_antennas*self.num_pols*self.num_subblocks, leave=False) as pbar:
pbar.set_description('Subblocks')
for subblock in range(self.num_subblocks):
if verbose:
tqdm.write(f'Creating subblock {subblock}...')
# Change num windows at the end if self.num_subblocks doesn't go in evenly
if T % subblock_T != 0 and subblock == self.num_subblocks - 1:
W = int((T % subblock_T) / self.num_taps) + 1
if self.antenna_source.start_obs:
num_samples = self.num_branches * self.num_taps * W
else:
num_samples = self.num_branches * self.num_taps * (W - 1)
# Calculate the real voltage samples from each antenna
t = time.time()
antennas_v = self.antenna_source.get_samples(num_samples)
self.sample_stage_t += time.time() - t
for antenna in range(self.num_antennas):
if verbose and self.is_antenna_array:
tqdm.write(f'Creating antenna #{antenna}...')
c_idx = antenna * self.num_chans + np.arange(0, self.num_chans)
for pol in range(self.num_pols):
# Store indices used for numpy data i/o per polarization
if T % subblock_T != 0 and subblock == self.num_subblocks - 1:
# Uses smaller num windows W
subblock_t_range = self.num_taps * (W - 1) * self.bytes_per_sample
else:
subblock_t_range = subblock_t_len
t_idx = subblock * subblock_t_len + self.num_bits // 4 * pol + np.arange(0,
subblock_t_range,
self.num_bits // 4 * self.num_pols)
# Send voltage data through the backend
v = antennas_v[antenna][pol]
if digitize:
t = time.time()
v = self.digitizer[antenna][pol].quantize(v)
self.digitizer_stage_t += time.time() - t
t = time.time()
v = self.filterbank[antenna][pol].channelize(v)
v = v[:, self.start_chan:self.start_chan+self.num_chans]
self.filterbank_stage_t += time.time() - t
if requantize:
t = time.time()
if self.input_file_stem is not None:
temp_mean_r = self.requantizer[antenna][pol].quantizer_r.target_mean
self.requantizer[antenna][pol].quantizer_r.target_mean = 0
temp_mean_i = self.requantizer[antenna][pol].quantizer_i.target_mean
self.requantizer[antenna][pol].quantizer_i.target_mean = 0
# Start off assuming signals are embedded in Gaussian noise with std 1
custom_stds = self.filterbank[antenna][pol].channelized_stds
# If digitizing real voltages, scale up by the appropriate factor
if digitize:
custom_stds *= self.digitizer[antenna][pol].target_std
v = self.requantizer[antenna][pol].quantize(v, custom_stds=custom_stds)
self.requantizer[antenna][pol].quantizer_r.target_mean = temp_mean_r
self.requantizer[antenna][pol].quantizer_i.target_mean = temp_mean_i
if self.num_bits == 8:
input_v = input_voltages[c_idx[:, np.newaxis], (t_idx//2)[np.newaxis, :]]
elif self.num_bits == 4:
input_v = input_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]]
input_v = xp.array(input_v)
v += input_v.T
v = self.requantizer[antenna][pol].quantize(v)
self.requantizer_stage_t += time.time() - t
# Convert to numpy array if using cupy
try:
R = xp.asnumpy(xp.real(v).T)
I = xp.asnumpy(xp.imag(v).T)
except AttributeError:
R = xp.real(v).T
I = xp.imag(v).T
if self.num_bits == 8 or not requantize:
final_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R
final_voltages[c_idx[:, np.newaxis], (t_idx+1)[np.newaxis, :]] = I
elif self.num_bits == 4:
# Translate 4 bit complex voltages to an 8 bit equivalent representation
I[I < 0] += 16
final_voltages[c_idx[:, np.newaxis], t_idx[np.newaxis, :]] = R * 16 + I
else:
raise ValueError(f'{self.num_bits} bits not supported...')
pbar.update(1)
return final_voltages
[docs] def get_num_blocks(self, obs_length):
"""
Calculate the number of blocks required as a function of observation length, in seconds. Note that only
an integer number of blocks will be recorded, so the actual observation length may be shorter than the
`obs_length` provided.
"""
return int(obs_length * abs(self.chan_bw) * self.num_antennas * self.num_chans * self.bytes_per_sample / self.block_size)
[docs] def record(self,
output_file_stem,
obs_length=None,
num_blocks=None,
length_mode='obs_length',
header_dict={},
digitize=True,
verbose=True):
"""
General function to actually collect data from the antenna source and return coarsely channelized complex
voltages. If input data is provided, only as much data as is in the input will be generated.
Parameters
----------
output_file_stem : str
Filename or path stem; the suffix will be automatically appended
obs_length : float, optional
Length of observation in seconds, if in `obs_length` mode
num_blocks : int, optional
Number of data blocks to record, if in `num_blocks` mode
length_mode : str, optional
Mode for specifying length of observation, either `obs_length` in seconds or `num_blocks` in data blocks
header_dict : dict, optional
Dictionary of header values to set. Use to overwrite non-essential header values or add custom ones.
digitize : bool, optional
Whether to quantize input voltages before the PFB
verbose : bool, optional
Control whether tqdm prints progress messages
"""
if length_mode == 'obs_length':
if obs_length is None:
if self.input_num_blocks is not None:
self.num_blocks = self.input_num_blocks
else:
raise ValueError("Value not given for 'obs_length'.")
else:
self.num_blocks = self.get_num_blocks(obs_length)
elif length_mode == 'num_blocks':
if num_blocks is None:
if self.input_num_blocks is not None:
self.num_blocks = self.input_num_blocks
else:
raise ValueError("Value not given for 'num_blocks'.")
else:
self.num_blocks = num_blocks
else:
raise ValueError("Invalid option given for 'length_mode'.")
# Ensure that we don't request more blocks than possible
if self.input_num_blocks is not None:
self.num_blocks = min(self.num_blocks, self.input_num_blocks)
self.obs_length = self.num_blocks * self.time_per_block
self.total_obs_num_samples = int(self.obs_length / self.tbin) * self.num_branches
# Mark each antenna and data stream as the start of the observation
self.antenna_source.reset_start()
# Reset filterbank cache as well
for antenna in range(self.num_antennas):
for pol in range(self.num_pols):
self.digitizer[antenna][pol]._reset_cache()
self.filterbank[antenna][pol]._reset_cache()
self.requantizer[antenna][pol]._reset_cache()
# Collect data and record to disk
num_files = int(xp.ceil(self.num_blocks / self.blocks_per_file))
with tqdm(total=self.num_blocks) as pbar:
pbar.set_description('Blocks')
for i in range(num_files):
save_fn = f'{output_file_stem}.{i:04}.raw'
# Create input raw file handler for use in collecting data
if self.input_file_stem is not None:
input_fn = f'{self.input_file_stem}.{i:04}.raw'
self.input_file_handler = open(input_fn, 'rb')
with open(save_fn, 'wb') as f:
# If blocks won't fill a whole file, adjust number of blocks to write at the end
if i == num_files - 1 and self.num_blocks % self.blocks_per_file != 0:
blocks_to_write = self.num_blocks % self.blocks_per_file
else:
blocks_to_write = self.blocks_per_file
for j in range(blocks_to_write):
if verbose:
tqdm.write(f'Creating block {j}...')
self._make_header(f, header_dict)
v = self.collect_data_block(digitize=digitize,
requantize=True,
verbose=verbose)
f.write(xp.array(v, dtype=xp.int8).tobytes())
if verbose:
tqdm.write(f'File {i}, block {j} recorded!')
pbar.update(1)
if self.input_file_stem is not None:
self.input_file_handler.close()