Source code for setigen.voltage.data_stream

import os

GPU_FLAG = os.getenv('SETIGEN_ENABLE_GPU', '0')
if GPU_FLAG == '1':
    try:
        import cupy as xp
    except ImportError:
        import numpy as xp
else:
    import numpy as xp

from astropy import units as u
from astropy.stats import sigma_clip

import time

from setigen import unit_utils


[docs] class DataStream(object): """ Facilitate noise and signal injection in a real voltage time series data stream, for a single polarization. Noise and signal sources are functions, saved as properties of the DataStream, so that individual samples can be queried using get_samples(). """
[docs] def __init__(self, sample_rate=3*u.GHz, fch1=0*u.GHz, ascending=True, t_start=0, seed=None): """ Initialize a DataStream object with a sampling rate and frequency range. By default, ``setigen.voltage`` does not employ heterodyne mixing and filtering to focus on a frequency bandwidth. Instead, the sensitive range is determined by these parameters; starting at the frequency ``fch1`` and spanning the Nyquist range ``sample_rate / 2`` in the increasing or decreasing frequency direction, as specified by ``ascending``. Note that accordingly, the spectral response will be susceptible to aliasing, so take care that the desired frequency range is correct and that signals are injected at appropriate frequencies. Parameters ---------- sample_rate : float, optional Physical sample rate, in Hz, for collecting real voltage data fch1 : astropy.Quantity, optional Central frequency of the first coarse channel, in Hz. If ``ascending=True``, ``fch1`` is the minimum frequency; if ``ascending=False`` (default), ``fch1`` is the maximum frequency. ascending : bool, optional Specify whether frequencies should be in ascending or descending order. Default is True, for which ``fch1`` is the minimum frequency. t_start : float, optional Start time, in seconds seed : None, int, Generator, optional Random seed or seed generator """ #: Random number generator self.rng = xp.random.default_rng(seed) self.sample_rate = unit_utils.get_value(sample_rate, u.Hz) self.dt = 1 / self.sample_rate # For adjusting signal frequencies self.fch1 = unit_utils.get_value(fch1, u.Hz) self.ascending = ascending # For estimating SNR for signals self.noise_std = 0 self.bg_noise_std = 0 # Tracks start time of next sequence of data self.t_start = t_start self.start_obs = True self.ts = None self.v = None # Hold functions that generate voltage values self.noise_sources = [] self.signal_sources = []
def _update_t(self, num_samples): """ Set array of times for voltage calculation, and reset voltage array. """ self.ts = self.t_start + xp.linspace(0., num_samples * self.dt, num_samples, endpoint=False) self.t_start += num_samples * self.dt self.v = xp.zeros(num_samples)
[docs] def set_time(self, t): """ Set start time before next set of samples. """ self.start_obs = True self.t_start = t
[docs] def add_time(self, t): """ Add time before next set of samples. """ self.set_time(self.t_start + t)
[docs] def update_noise(self, stats_calc_num_samples=10000): """ Replace ``self.noise_std`` by calculating out a few samples and estimating the standard deviation of the voltages. Parameters ---------- stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise standard deviation """ start_obs = self.start_obs t_start = self.t_start v = self.get_samples(num_samples=stats_calc_num_samples) _, self.noise_std = estimate_stats(v, stats_calc_num_samples=stats_calc_num_samples) self.start_obs = start_obs self.t_start = t_start
[docs] def get_total_noise_std(self): """ Get the standard deviation of the noise. If this DataStream is part of an array of Antennas, this will account for the background noise in the corresponding polarization. Note that if this DataStream has custom signals or noise, it might not 'know' what the noise standard deviation is. In this case, one should run :func:`~setigen.voltage.data_stream.DataStream.update_noise` to update the DataStream's estimate for the noise. Note that this actually runs :func:`~setigen.voltage.data_stream.DataStream.get_samples` for the calculation, so if your custom signal functions have mutable properties, make sure to reset these (if necessary) before saving out data. """ return xp.sqrt(self.noise_std**2 + self.bg_noise_std**2)
[docs] def add_noise(self, v_mean, v_std): """ Add Gaussian noise source to data stream. This essentially adds a lambda function that gets the appropriate number of noise samples to add to the voltage array when :func:`~setigen.voltage.data_stream.DataStream.get_samples` is called. Updates noise property to reflect added noise. Parameters ---------- v_mean : float Noise mean v_std : float Noise standard deviation """ noise_func = lambda ts: v_mean + v_std * self.rng.standard_normal(size=len(ts)) # Variances add, not standard deviations self.noise_std = xp.sqrt(self.noise_std**2 + v_std**2) self.noise_sources.append(noise_func)
[docs] def add_constant_signal(self, f_start, drift_rate, level, phase=0): """ Adds a drifting cosine signal (linear chirp) as a signal source function. Parameters ---------- f_start : float Starting signal frequency drift_rate : float Drift rate in Hz / s level : float Signal level or amplitude phase : float Phase, in radiations """ f_start = unit_utils.get_value(f_start, u.Hz) drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s) def signal_func(ts): # Calculate adjusted center frequencies, according to chirp chirp_phase = 2 * xp.pi * ((f_start - self.fch1) * ts + 0.5 * drift_rate * ts**2) if not self.ascending: chirp_phase = -chirp_phase return level * xp.cos(chirp_phase + phase) self.signal_sources.append(signal_func)
[docs] def add_signal(self, signal_func): """ Wrapper function to add a custom signal source function. """ self.signal_sources.append(signal_func)
[docs] def get_samples(self, num_samples): """ Retrieve voltage samples, based on noise and signal source functions. If custom signals add complex voltages, the voltage array will be cast to complex type. Parameters ---------- num_samples : int Number of samples to get Returns ------- v : array Array of voltage samples """ self._update_t(num_samples) for noise_func in self.noise_sources: self.v += noise_func(self.ts) for signal_func in self.signal_sources: # Ensure that the array is of the correct type signal_v = xp.array(signal_func(self.ts)) # If there are complex voltages, make sure to cast self.v to complex if not xp.iscomplexobj(self.v) and xp.iscomplexobj(signal_v): self.v = self.v.astype(complex) self.v += signal_v self.start_obs = False return self.v
[docs] class BackgroundDataStream(DataStream): """ Extends DataStream for background data in Antenna arrays. """
[docs] def __init__(self, sample_rate=3*u.GHz, fch1=0*u.GHz, ascending=True, t_start=0, seed=None, antenna_streams=[]): """ Initialize a BackgroundDataStream object with a sampling rate and frequency range. The main extension is that we also pass in a list of DataStreams, belonging to all the Antennas within a MultiAntennaArray, for the same corresponding polarization. When noise is added to a BackgroundDataStream, the noise standard deviation gets propagated to each Antenna DataStream via the ``DataStream.bg_noise_std`` property. Parameters ---------- sample_rate : float, optional Physical sample rate, in Hz, for collecting real voltage data fch1 : astropy.Quantity, optional Central frequency of the first coarse channel, in Hz. If ``ascending=True``, ``fch1`` is the minimum frequency; if ``ascending=False`` (default), ``fch1`` is the maximum frequency. ascending : bool, optional Specify whether frequencies should be in ascending or descending order. Default is True, for which ``fch1`` is the minimum frequency. t_start : float, optional Start time, in seconds seed : int, optional Integer seed between 0 and 2**32. If None, the random number generator will use a random seed. antenna_streams : list of DataStream objects List of DataStreams, which belong to the Antennas in a MultiAntennaArray, all corresponding to the same polarization """ super().__init__(sample_rate=sample_rate, fch1=fch1, ascending=ascending, t_start=t_start, seed=seed) self.antenna_streams = antenna_streams
def _set_all_bg_noise(self): """ Helper function to set background noise standard deviation of each antenna for the corresponding polarization equal to that of this stream. """ for stream in self.antenna_streams: stream.bg_noise_std = self.noise_std
[docs] def update_noise(self, stats_calc_num_samples=10000): """ Replace self.noise_std by calculating out a few samples and estimating the standard deviation of the voltages. Further, set all child antenna background noise values. Parameters ---------- stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise standard deviation """ DataStream.update_noise(self, stats_calc_num_samples=stats_calc_num_samples) self._set_all_bg_noise()
[docs] def add_noise(self, v_mean, v_std): """ Add Gaussian noise source to data stream. This essentially adds a lambda function that gets the appropriate number of noise samples to add to the voltage array when :func:`~setigen.voltage.data_stream.DataStream.get_samples` is called. Updates noise property to reflect added noise. Further, set all child antenna background noise values. Parameters ---------- v_mean : float Noise mean v_std : float Noise standard deviation """ DataStream.add_noise(self, v_mean, v_std) self._set_all_bg_noise()
[docs] def estimate_stats(voltages, stats_calc_num_samples=10000): """ Estimate mean and standard deviation, truncating to at most ``stats_calc_num_samples`` samples to reduce computation. Parameters ---------- voltages : array Array of voltages stats_calc_num_samples : int, optional Maximum number of samples for use in estimating noise statistics Returns ------- data_mean : float Mean of voltages data_sigma : float Standard deviation of voltages """ calc_len = xp.amin(xp.array([stats_calc_num_samples, len(voltages)])) data_sigma = xp.std(voltages[:calc_len]) data_mean = xp.mean(voltages[:calc_len]) return data_mean, data_sigma