import sys
import os.path
import copy
import time
import pathlib
import numpy as np
import matplotlib.pyplot as plt
try:
import cPickle as pickle
except:
import pickle
from astropy import units as u
from astropy.time import Time
from astropy.stats import sigma_clip
from blimpy import Waterfall
from blimpy.io import sigproc
from . import waterfall_utils
from . import distributions
from . import sample_from_obs
from . import unit_utils
from . import frame_utils
from . import plots
from .funcs import paths
from .funcs import t_profiles
from .funcs import f_profiles
from .funcs import bp_profiles
[docs]
class Frame(object):
"""
A class to facilitate the creation of entirely synthetic radio data
(narrowband signals + background noise) as well as signal injection into
existing observations.
"""
[docs]
def __init__(self,
waterfall=None,
fchans=None,
tchans=None,
df=2.7939677238464355*u.Hz,
dt=18.253611008*u.s,
fch1=6*u.GHz,
ascending=False,
data=None,
seed=None,
**kwargs):
"""
Initialize a Frame object either from an existing .fil/.h5 file or
from frame resolution / size.
If you are initializing based on a .fil or .h5, pass in either the
filename or the Waterfall object into the waterfall keyword.
Otherwise, you can initialize a frame by specifying the parameters
``fchans``, ``tchans``, ``df``, ``dt``, and even ``fch1``, if it's important to
specify frequencies (8 GHz is an arbitrary but reasonable choice
otherwise). Note that the frame resolutions ``df`` and ``dt`` are given
defaults based on the Breakthrough Listen high frequency resolution
data product -- be sure to change these if you are working with
different kinds of data.
The `data` keyword is only necessary if you are also
preloading data that matches your specified frame dimensions and
resolutions.
Parameters
----------
waterfall : str or Waterfall, optional
Name of filterbank file or Waterfall object for preloading data
fchans : int, optional
Number of frequency samples
tchans: int, optional
Number of time samples
df : astropy.Quantity, optional
Frequency resolution (e.g. in u.Hz)
dt : astropy.Quantity, optional
Time resolution (e.g. in u.s)
fch1 : astropy.Quantity, optional
Central frequency of first channel. If ``ascending=True``,
``fch1`` is the minimum frequency; if ``ascending=False``
(default), ``fch1`` is the maximum frequency.
ascending : bool, optional
Specify whether frequencies should be in ascending order, so that
``fch1`` is the minimum frequency. Default is False, for which ``fch1``
is the maximum frequency. This is overwritten if a waterfall
object is provided, where ``ascending`` will be automatically
determined by observational parameters.
data : ndarray, optional
2D array of intensities to preload into frame
seed : None, int, Generator, optional
Random seed or seed generator
**kwargs
For convenience, the ``shape`` keyword can be used in place of individually
setting ``fchans`` and ``tchans``, so that ``shape=(tchans, fchans)``.
"""
self.rng = np.random.default_rng(seed)
if None not in [fchans, tchans] or 'shape' in kwargs or data is not None:
self.waterfall = None
# Need to address this and come up with a meaningful header
self.header = None
self.df = unit_utils.get_value(abs(df), u.Hz)
self.dt = unit_utils.get_value(dt, u.s)
self.fch1 = unit_utils.get_value(fch1, u.Hz)
self.ascending = ascending
mjd = kwargs.get('mjd')
if mjd is not None:
self.t_start = Time(mjd, format='mjd').unix
else:
self.t_start = kwargs.get('t_start', time.time())
self.source_name = kwargs.get('source_name', 'Synthetic')
if 'shape' in kwargs:
(self.tchans, self.fchans) = self.shape = kwargs['shape']
elif data is not None:
(self.tchans, self.fchans) = self.shape = data.shape
else:
self.fchans = int(unit_utils.get_value(fchans, u.pixel))
self.tchans = int(unit_utils.get_value(tchans, u.pixel))
self.shape = (self.tchans, self.fchans)
if data is not None:
assert data.shape == self.shape
self.data = np.copy(data)
else:
self.data = np.zeros(self.shape)
elif waterfall:
# Load waterfall via filename or Waterfall object
if isinstance(waterfall, pathlib.PurePath):
waterfall = str(waterfall)
if isinstance(waterfall, str):
f_start = kwargs.get('f_start')
f_stop = kwargs.get('f_stop')
self.waterfall = Waterfall(waterfall, f_start=f_start, f_stop=f_stop)
elif isinstance(waterfall, Waterfall):
self.waterfall = waterfall
else:
raise FileNotFoundError(f'Unsupported data type: {type(waterfall)}')
self.header = self.waterfall.header
self.tchans, _, self.fchans = self.waterfall.container.selection_shape
self.shape = (self.tchans, self.fchans)
# Frequency values are saved in MHz in waterfall files
self.df = unit_utils.cast_value(abs(self.waterfall.header['foff']),
u.MHz).to(u.Hz).value
self.dt = unit_utils.get_value(self.waterfall.header['tsamp'], u.s)
self.ascending = (self.waterfall.header['foff'] > 0)
if self.ascending:
self.fch1 = self.waterfall.container.f_start
else:
self.fch1 = self.waterfall.container.f_stop
self.fch1 = unit_utils.cast_value(self.fch1,
u.MHz).to(u.Hz).value
self.t_start = Time(self.waterfall.header['tstart'], format='mjd').unix
self.source_name = self.waterfall.header['source_name']
# When multiple Stokes parameters are supported, this will have to
# be expanded.
self.data = waterfall_utils.get_data(self.waterfall)
if not self.ascending:
self.data = self.data[:, ::-1]
else:
raise ValueError(f'Frame must be provided dimensions or an '
f'existing filterbank file.')
# Degrees of freedom for chi-squared radiometer noise
# 2 polarizations, real and imaginary components -> 4
self.chi2_df = 4 * round(self.df * self.dt)
# Calculate unit drift rate (pixel over pixel drift)
self.unit_drift_rate = self.df / self.dt
# Shared creation of ranges
self._update_fs()
self._update_ts()
# No matter what, self.data will be populated at this point.
self._update_noise_frame_stats()
# Placeholder dictionary for user metadata, just for bookkeeping purposes
self.metadata = self.get_params()
[docs]
@classmethod
def from_data(cls, df, dt, fch1, ascending, data, metadata={}, waterfall=None):
"""
Initialize Frame more directly from 2D numpy array of data.
Parameters
----------
df : astropy.Quantity
Frequency resolution (e.g. in u.Hz)
dt : astropy.Quantity
Time resolution (e.g. in u.s)
fch1 : astropy.Quantity
Central frequency of first channel. If ``ascending=True``,
``fch1`` is the minimum frequency; if ``ascending=False``
(default), ``fch1`` is the maximum frequency.
ascending : bool
Specify whether frequencies should be in ascending order, so that
``fch1`` is the minimum frequency. Default is False, for which ``fch1``
is the maximum frequency. This is overwritten if a waterfall
object is provided, where ``ascending`` will be automatically
determined by observational parameters.
data : ndarray
2D array of intensities to preload into frame
metadata : dict, optional
Dictionary of features associated with the frame
waterfall : Waterfall, optional
Associated Waterfall object if data is derived from another frame object
(accessed via ``frame.get_waterfall()``) or a blimpy waterfall object
Returns
-------
frame : Frame
Frame object with preloaded data
"""
tchans, fchans = data.shape
frame = cls(fchans=fchans,
tchans=tchans,
df=df,
dt=dt,
fch1=fch1,
ascending=ascending,
data=data)
frame.add_metadata(metadata)
# Remove h5 object, which can't be pickled
try:
del waterfall.container.h5
except AttributeError:
pass
frame.waterfall = copy.deepcopy(waterfall)
return frame
[docs]
@classmethod
def from_waterfall(cls, waterfall):
"""
Instantiate Frame using a filterbank file or blimpy Waterfall object.
"""
return cls(waterfall=waterfall)
[docs]
@classmethod
def from_backend_params(cls,
fchans=None,
obs_length=300,
sample_rate=3e9,
num_branches=1024,
fftlength=1048576,
int_factor=51,
fch1=6*u.GHz,
ascending=False,
data=None):
"""
Create frame based on backend / software related parameters.
Either ``fchans`` or ``data`` must be provided to get number of frequency
channels to create. If a 2D numpy array for ``data`` is provided, ``fchans``
will be inferred. The parameter ``int_factor`` must still be provided
to determine ``tchans``; there is a check that the data dimensions also match.
Since multiple ``int_factor`` values may correspond to the same ``tchans``,
for clarity we do not infer ``int_factor`` just from the dimensions of the data.
Parameters
----------
fchans : int, optional
Number of frequency samples. Should be provided if ``data`` is None.
obs_length : float, optional
Length of observation in seconds
sample_rate : float, optional
Physical sample rate, in Hz, for collecting real voltage data
num_branches : int, optional
Number of PFB branches. Note that this corresponds to ``num_branches / 2`` coarse channels.
fftlength : int, optional
FFT length to be used in fine channelization
int_factor : int, optional
Integration factor used in fine channelization. Determines ``tchans``.
fch1 : astropy.Quantity, optional
Central frequency of first channel. If ``ascending=True``,
``fch1`` is the minimum frequency; if ``ascending=False``
(default), ``fch1`` is the maximum frequency.
ascending : bool, optional
Specify whether frequencies should be in ascending order, so that
``fch1`` is the minimum frequency. Default is False, for which ``fch1``
is the maximum frequency.
data : ndarray, optional
2D array of intensities to preload into frame. If provided, ``fchans``
will be inferred from this.
Returns
-------
frame : Frame
Frame object with appropriate dimensions.
"""
chan_bw = sample_rate / num_branches
df = chan_bw / fftlength
if data is not None:
tchans, fchans = data.shape
elif fchans is None:
raise ValueError("Value not given for fchans")
param_dict = params_from_backend(obs_length=obs_length,
sample_rate=sample_rate,
num_branches=num_branches,
fftlength=fftlength,
int_factor=int_factor)
if data is not None:
print(param_dict['tchans'], tchans)
assert param_dict['tchans'] == tchans
frame = cls(fchans=fchans,
**param_dict,
fch1=fch1,
ascending=ascending,
data=data)
return frame
[docs]
def copy(self):
"""
Return identical copy of frame.
"""
c_frame = copy.deepcopy(self)
# Note that since the __getstate__ function is overwritten, we need to
# add back the waterfall object.
waterfall = self.get_waterfall()
if waterfall is not None:
c_frame.waterfall = copy.deepcopy(waterfall)
return c_frame
def __getstate__(self):
# Exclude waterfall Waterfall object from pickle, since it uses open threads, which
# can't be pickled -- note that this affects copy!
state = self.__dict__.copy()
state['waterfall'] = None
return state
def _update_fs(self):
"""
Calculate and update an array of frequencies represented in the
frame.
"""
# Normally, self.ascending will be False; filterbank convention is decreasing freqs
if self.ascending:
self.fmin = self.fch1
self.fs = np.linspace(self.fmin,
self.fmin + self.fchans * self.df,
self.fchans,
endpoint=False)
self.fmax = self.fs[-1]
else:
self.fmax = self.fch1
self.fs = np.linspace(self.fmax,
self.fmax - self.fchans * self.df,
self.fchans,
endpoint=False)
self.fmin = self.fs[-1]
self.fs = self.fs[::-1]
def _update_ts(self):
"""
Calculate and update an array of times represented in the frame.
"""
self.ts = unit_utils.get_value(np.linspace(0,
self.tchans * self.dt,
self.tchans,
endpoint=False),
u.s)
@property
def fmid(self):
return (self.fmin + self.fmax) / 2
@property
def mjd(self):
return Time(self.t_start, format='unix').mjd
@property
def t_stop(self):
return self.t_start + self.tchans * self.dt
@property
def obs_length(self):
return self.tchans * self.dt
@property
def ts_ext(self):
"""
Extended time array of length ``tchans + 1``, including the ending
timestamp.
"""
return np.append(self.ts, self.ts[-1] + self.dt)
@property
def mean(self):
return np.mean(self.data)
@property
def std(self):
return np.std(self.data)
[docs]
def get_total_stats(self):
return self.mean, self.std
[docs]
def get_noise_stats(self):
return self.noise_mean, self.noise_std
def _update_noise_frame_stats(self):
"""
Calculate and update basic noise statistics (mean and standard
deviation) of the frame, using sigma clipping to strip outliers.
"""
clipped_data = sigma_clip(self.data,
sigma=3,
maxiters=5,
masked=False)
self.noise_mean, self.noise_std = np.mean(clipped_data), np.std(clipped_data)
[docs]
def zero_data(self):
"""
Reset data to a numpy array of zeros.
"""
self.data = np.zeros(self.shape)
self.noise_mean = self.noise_std = 0
[docs]
def add_noise(self,
x_mean,
x_std=None,
x_min=None,
noise_type='chi2'):
"""
By default, synthesize radiometer noise based on a chi-squared
distribution. Alternately, can generate pure Gaussian noise.
Specifying ``noise_type='chi2'`` will only use ``x_mean``,
and ignore other parameters. Specifying ``noise_type='gaussian'``
will use all arguments (if provided).
When adding Gaussian noise to the frame, the minimum is simply a
lower bound for intensities in the data (e.g. it may make sense to
cap intensities at 0), but this is optional.
Parameters
----------
x_mean : float
Target mean
x_std : float, optional
Target standard deviation
x_min : float, optional
Lower bound for Gaussian noise
noise_type : {"chi2", "gaussian", "normal"}, default: "chi2"
Distribution to use for synthetic noise
Return
------
noise : ndarray
Array of synthetic noise
"""
if noise_type == 'chi2':
noise = distributions.chi2(x_mean,
self.chi2_df,
self.shape,
seed=self.rng)
# Based on variance of ideal chi-squared distribution
x_std = np.sqrt(2 * self.chi2_df) * x_mean / self.chi2_df
elif noise_type in ['normal', 'gaussian']:
if x_std is not None:
if x_min is not None:
noise = distributions.truncated_gaussian(x_mean,
x_std,
x_min,
self.shape,
seed=self.rng)
else:
noise = distributions.gaussian(x_mean,
x_std,
self.shape,
seed=self.rng)
else:
raise ValueError("x_std must be given")
else:
raise ValueError(f"'{noise_type}' is not a valid noise type")
self.data += noise
set_to_param = (self.noise_mean == self.noise_std == 0)
if set_to_param:
self.noise_mean, self.noise_std = x_mean, x_std
else:
self._update_noise_frame_stats()
return noise
[docs]
def add_noise_from_obs(self,
x_mean_array=None,
x_std_array=None,
x_min_array=None,
share_index=True,
noise_type='chi2'):
"""
By default, synthesize radiometer noise based on a chi-squared
distribution. Alternately, can generate pure Gaussian noise.
If no arrays are specified from which to sample, noise
samples will be drawn from saved GBT C-Band observations at
(dt, df) = (1.4 s, 1.4 Hz) resolution, from frames of shape
(tchans, fchans) = (32, 1024). These sample noise parameters consist
of 126500 samples for mean, std, and min of each observation.
Specifying ``noise_type='chi2'`` will only use ``x_mean_array`` (if provided),
and ignore other parameters. Specifying noise_type='gaussian' will use
all arrays (if provided).
Note: this method will attempt to scale the noise parameters to match
self.dt and self.df. This assumes that the observation data products
are *not* normalized by the FFT length used to construct them.
Parameters
----------
x_mean_array : ndarray, optional
Array of potential means
x_std_array : ndarray, optional
Array of potential standard deviations
x_min_array : ndarray, optional
Array of potential minimum values
share_index : bool, optional, default: True
Whether to select noise parameters from the same index across each
provided array. If True, then each array must be the same length.
noise_type : {"chi2", "gaussian", "normal"}, default: "chi2"
Distribution to use for synthetic noise
Return
------
noise : ndarray
Array of synthetic noise
"""
if (x_mean_array is None
and x_std_array is None
and x_min_array is None):
my_path = os.path.abspath(os.path.dirname(__file__))
path = os.path.join(my_path, 'assets/sample_noise_params.npy')
sample_noise_params = np.load(path)
# Accounts for scaling from FFT length and time/freq resolutions
# Turns out that fft_length * df is constant,
# e.g. 1500 / 512 / fft_length = df
obs_dt = 1.4316557653333333
scale_factor = self.dt / obs_dt
x_mean_array = sample_noise_params[:, 0] * scale_factor
x_std_array = sample_noise_params[:, 1] * scale_factor
x_min_array = sample_noise_params[:, 2] * scale_factor
if noise_type == 'chi2':
x_mean = self.rng.choice(x_mean_array)
noise = distributions.chi2(x_mean,
self.chi2_df,
self.shape,
seed=self.rng)
# Based on variance of ideal chi-squared distribution
x_std = np.sqrt(2 * self.chi2_df) * x_mean / self.chi2_df
elif noise_type in ['normal', 'gaussian']:
if x_min_array is not None:
if share_index:
if (len(x_mean_array) != len(x_std_array)
or len(x_mean_array) != len(x_min_array)):
raise IndexError('To share a random index, all parameter \
arrays must be the same length!')
i = self.rng.integers(len(x_mean_array))
x_mean, x_std, x_min = (x_mean_array[i],
x_std_array[i],
x_min_array[i])
else:
x_mean, x_std, x_min = sample_from_obs \
.sample_gaussian_params(x_mean_array,
x_std_array,
x_min_array,
seed=self.rng)
noise = distributions.truncated_gaussian(x_mean,
x_std,
x_min,
self.shape,
seed=self.rng)
else:
if share_index:
if len(x_mean_array) != len(x_std_array):
raise IndexError('To share a random index, all parameter \
arrays must be the same length!')
i = self.rng.integers(len(x_mean_array))
x_mean, x_std = x_mean_array[i], x_std_array[i]
else:
x_mean, x_std = sample_from_obs \
.sample_gaussian_params(x_mean_array,
x_std_array,
seed=self.rng)
noise = distributions.gaussian(x_mean,
x_std,
self.shape,
seed=self.rng)
else:
raise ValueError(f"'{noise_type}' is not a valid noise type")
self.data += noise
set_to_param = (self.noise_mean == self.noise_std == 0)
if set_to_param:
self.noise_mean, self.noise_std = x_mean, x_std
else:
self._update_noise_frame_stats()
return noise
[docs]
def add_signal(self,
path,
t_profile,
f_profile,
bp_profile=None,
bounding_f_range=None,
integrate_path=False,
integrate_t_profile=False,
integrate_f_profile=False,
doppler_smearing=False,
t_subsamples=10,
f_subsamples=10,
smearing_subsamples=10):
"""
Generate synthetic signal.
Add a synethic signal using given path in time-frequency domain and
brightness profiles in time and frequency directions.
Parameters
----------
path : function, np.ndarray, list, float
Function in time that returns frequencies, or provided array or
single value of frequencies for the center of the signal at each
time sample
t_profile : function, np.ndarray, list, float
Time profile: function in time that returns an intensity (scalar),
or provided array or single value of intensities at each time
sample
f_profile : function
Frequency profile: function in frequency that returns an intensity
(scalar), relative to the signal frequency within a time sample.
Note that unlike the other parameters, this must be a function
bp_profile : function, np.ndarray, list, float, optional
Bandpass profile: function in frequency that returns a relative
intensity (scalar, between 0 and 1), or provided array or single
value of relative intensities at each frequency sample
bounding_f_range : tuple
Tuple (bounding_min, bounding_max) that constrains the computation
of the signal to only a range in frequencies
integrate_path : bool, optional
Option to average path along time to get a more accurate frequency
position in t-f space. Note that this option only makes sense if
the provided path can be evaluated at the sub frequency sample
level (e.g. as opposed to returning a pre-computed array of
frequencies of length ``tchans``). Makes ``t_subsamples`` calculations
per time sample.
integrate_t_profile : bool, optional
Option to integrate ``t_profile`` in the time direction. Note that
this option only makes sense if the provided ``t_profile`` can be
evaluated at the sub time sample level (e.g. as opposed to
returning an array of intensities of length ``tchans``). Makes
``t_subsamples`` calculations per time sample.
integrate_f_profile : bool, optional
Option to integrate ``f_profile`` in the frequency direction. Makes
``f_subsamples`` calculations per time sample.
doppler_smearing : bool, optional
Option to numerically "Doppler smear" spectral power over
frequency bins. At time t, averages ``smearing_subsamples`` copies of
the signal centered at evenly spaced center frequencies between
times t and t+1. This causes the effective drop in power when
the signal crosses multiple bins.
t_subsamples : int, optional
Number of bins for integration in the time direction, using
Riemann sums. Default is 10.
f_subsamples : int, optional
Number of bins for integration in the frequency direction, using
Riemann sums. Default is 10.
smearing_subsamples : int, optional
Number of steps for averaging evenly spaced copies of the signal
between center frequencies at times t and t+1. Default is 10.
Returns
-------
signal : ndarray
Two-dimensional NumPy array containing synthetic signal data
Examples
--------
Here's an example that creates a linear Doppler-drifted signal with
chi-squared noise with sampled parameters:
>>> from astropy import units as u
>>> import setigen as stg
>>> fchans = 1024
>>> tchans = 32
>>> df = 2.7939677238464355*u.Hz
>>> dt = tsamp = 18.253611008*u.s
>>> fch1 = 6095.214842353016*u.MHz
>>> frame = stg.Frame(fchans=fchans,
tchans=tchans,
df=df,
dt=dt,
fch1=fch1)
>>> noise = frame.add_noise(x_mean=10)
>>> signal = frame.add_signal(stg.constant_path(f_start=frame.get_frequency(200),
drift_rate=2*u.Hz/u.s),
stg.constant_t_profile(level=frame.get_intensity(snr=30)),
stg.gaussian_f_profile(width=40*u.Hz),
stg.constant_bp_profile(level=1))
Saving the noise and signals individually may be useful depending on
the application, but the combined data can be accessed via
frame.get_data(). The synthetic signal can then be visualized and
saved within a Jupyter notebook using:
>>> %matplotlib inline
>>> import matplotlib.pyplot as plt
>>> fig = plt.figure(figsize=(10, 6))
>>> frame.plot()
>>> plt.savefig('image.png', bbox_inches='tight')
>>> plt.show()
To run within a script, simply exclude the first line:
``%matplotlib inline``.
"""
if bounding_f_range is None:
bounding_min, bounding_max = 0, self.fchans
else:
bounding_min = max(self.get_index(bounding_f_range[0]), 0)
bounding_max = min(self.get_index(bounding_f_range[1]), self.fchans)
restricted_fs = self.fs[bounding_min:bounding_max]
if integrate_f_profile:
f0 = restricted_fs[0]
restricted_fchans = len(restricted_fs)
restricted_fs = np.linspace(f0,
f0 + restricted_fchans * self.df,
restricted_fchans * f_subsamples,
endpoint=False)
ff, _ = np.meshgrid(restricted_fs, self.ts)
# Handle t_profile
if callable(t_profile):
# Integrate in time direction to capture temporal variations more
# accurately
if integrate_t_profile:
new_ts = np.linspace(0,
self.tchans * self.dt,
self.tchans * t_subsamples,
endpoint=False)
y = t_profile(new_ts)
if not isinstance(y, np.ndarray):
y = np.repeat(y, self.tchans * t_subsamples)
integrated_y = np.mean(np.reshape(y, (self.tchans,
t_subsamples)),
axis=1)
t_profile = integrated_y
else:
t_profile = t_profile(self.ts)
elif isinstance(t_profile, (list, np.ndarray)):
t_profile = np.array(t_profile)
if t_profile.shape != self.ts.shape:
raise ValueError('Shape of t_profile array is {0} != {1}.'
.format(t_profile.shape, self.ts.shape))
elif isinstance(t_profile, (int, float)):
t_profile = np.full(self.tchans, t_profile)
else:
raise TypeError('t_profile is not a function, array, or float.')
_, t_profile_tt = np.meshgrid(restricted_fs, t_profile)
# Handle path. Generate one extra time sample for freq smearing
# calculations
tchans_eff = self.tchans
if doppler_smearing:
tchans_eff += 1
if callable(path):
# Average using integration to get a better position in frequency
# direction
if integrate_path:
new_ts = np.linspace(0,
tchans_eff * self.dt,
tchans_eff * t_subsamples,
endpoint=False)
f = path(new_ts)
if not isinstance(f, np.ndarray):
f = np.repeat(f, tchans_eff * t_subsamples)
integrated_f = np.mean(np.reshape(f, (tchans_eff,
t_subsamples)),
axis=1)
path = integrated_f
else:
ts = self.ts
if doppler_smearing:
ts = self.ts_ext
path = path(ts)
elif isinstance(path, (list, np.ndarray)):
path = np.array(path)
if path.shape != self.ts.shape:
raise ValueError(f'Shape of path array is {path.shape} '
f'!= {self.ts.shape}.')
elif doppler_smearing and len(path) != self.tchans + 1:
raise ValueError(f'To Doppler smear power, must provide'
f'path array with {self.tchans + 1} values')
elif isinstance(path, (int, float)):
path = np.full(tchans_eff, path)
else:
raise TypeError('path is not a function, array, or float.')
# Ensure that path f_centers are the right length
_, path_tt = np.meshgrid(restricted_fs, path[:self.tchans])
if doppler_smearing:
dpath = np.diff(path) / smearing_subsamples
_, dpath_tt = np.meshgrid(restricted_fs, dpath)
# Handle bandpass profile
if bp_profile is None:
bp_profile = 1
if callable(bp_profile):
bp_profile = bp_profile(restricted_fs)
elif isinstance(bp_profile, (list, np.ndarray)):
bp_profile = np.array(bp_profile)
if bp_profile.shape != restricted_fs.shape:
raise ValueError('Shape of bp_profile array is {0} != {1}.'
.format(bp_profile.shape,
restricted_fs.shape))
elif isinstance(bp_profile, (int, float)):
bp_profile = np.full(restricted_fs.shape, bp_profile)
else:
raise TypeError('bp_profile is not a function, array, or float.')
bp_profile_ff, _ = np.meshgrid(bp_profile, self.ts)
# Create signal, adding multiple copies for Doppler smearing case
if doppler_smearing:
signal = np.zeros(ff.shape)
for _ in range(smearing_subsamples):
signal += (t_profile_tt * f_profile(ff, path_tt)
/ smearing_subsamples * bp_profile_ff)
path_tt += dpath_tt
else:
signal = t_profile_tt * f_profile(ff, path_tt) * bp_profile_ff
if integrate_f_profile:
signal = np.mean(np.reshape(signal, (self.tchans,
restricted_fchans,
f_subsamples)),
axis=2)
self.data[:, bounding_min:bounding_max] += signal
signal_frame = np.zeros(self.shape)
signal_frame[:, bounding_min:bounding_max] = signal
return signal_frame
[docs]
def add_constant_signal(self,
f_start,
drift_rate,
level,
width,
f_profile_type='sinc2',
doppler_smearing=False):
"""
A wrapper around add_signal() that injects a constant intensity,
constant drift_rate signal into the frame.
Parameters
----------
f_start : astropy.Quantity
Starting signal frequency
drift_rate : astropy.Quantity
Signal drift rate, in units of frequency per time
level : float
Signal intensity
width : astropy.Quantity
Signal width in frequency units
f_profile_type : {"sinc2", "box", "gaussian", "lorentzian", "voigt}, default: "sinc2"
Signal spectral profile
doppler_smearing : bool, optional, default: False
Option to numerically "Doppler smear" spectral power over
frequency bins. At time t, averages ``drift_rate / frame.unit_drift_rate``
copies of the signal centered at evenly spaced center frequencies between
times t and t+1. This causes the effective drop in power when
the signal crosses multiple bins.
Returns
-------
signal : ndarray
Two-dimensional NumPy array containing synthetic signal data
"""
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
width = unit_utils.get_value(width, u.Hz)
start_index = self.get_index(f_start)
# Calculate the bounding box, to optimize signal insertion calculation
px_width_offset = 2 * width / self.df
if drift_rate < 0:
px_width_offset = -px_width_offset
px_drift_offset = self.dt * (self.tchans - 1) * drift_rate / self.df
if doppler_smearing:
px_drift_offset += drift_rate * self.dt / self.df
bounding_start_index = start_index + int(-px_width_offset)
bounding_stop_index = start_index + int(px_drift_offset + px_width_offset)
bounding_min_index = max(min(bounding_start_index, bounding_stop_index), 0)
bounding_max_index = min(max(bounding_start_index, bounding_stop_index), self.fchans)
# Select common frequency profile types
if f_profile_type == 'gaussian':
f_profile = f_profiles.gaussian_f_profile(width)
elif f_profile_type == 'lorentzian':
f_profile = f_profiles.lorentzian_f_profile(width)
elif f_profile_type == 'voigt':
f_profile = f_profiles.voigt_f_profile(width, width)
elif f_profile_type == 'sinc2':
f_profile = f_profiles.sinc2_f_profile(width)
elif f_profile_type == 'box':
f_profile = f_profiles.box_f_profile(width)
else:
raise ValueError('Unsupported f_profile for constant signal!')
return self.add_signal(path=paths.constant_path(f_start, drift_rate),
t_profile=t_profiles.constant_t_profile(level),
f_profile=f_profile,
bp_profile=bp_profiles.constant_bp_profile(level=1),
bounding_f_range=(self.get_frequency(bounding_min_index),
self.get_frequency(bounding_max_index)),
doppler_smearing=doppler_smearing,
smearing_subsamples=int(np.ceil(drift_rate / self.unit_drift_rate)))
[docs]
def get_index(self, frequency):
"""
Convert frequency to closest index in frame.
"""
return np.round((unit_utils.get_value(frequency, u.Hz) - self.fmin) / self.df).astype(int)
[docs]
def get_frequency(self, index):
"""
Convert index to frequency.
"""
return self.fmin + self.df * index
[docs]
def get_intensity(self, snr):
"""
Calculate intensity from SNR, based on estimates of the noise in the
frame.
Note that there must be noise present in the frame for this to make
sense.
"""
if self.noise_std == 0:
raise ValueError('You must add noise in the image to specify SNR!')
return snr * self.noise_std / np.sqrt(self.tchans)
[docs]
def get_snr(self, intensity):
"""
Calculate SNR from intensity.
Note that there must be noise present in the frame for this to make
sense.
"""
if self.noise_std == 0:
raise ValueError('You must add noise in the image to return SNR!')
return intensity * np.sqrt(self.tchans) / self.noise_std
[docs]
def get_drift_rate(self, start_index, stop_index):
return (stop_index - start_index) * self.df / (self.tchans * self.dt)
[docs]
def get_info(self):
return vars(self)
[docs]
def get_params(self):
return {
'fchans': self.fchans,
'tchans': self.tchans,
'df': self.df,
'dt': self.dt,
'fch1': self.fch1,
'ascending': self.ascending
}
[docs]
def get_data(self, db=False):
if db:
return 10 * np.log10(self.data)
return self.data
[docs]
def plot(self, *args, **kwargs):
"""
Plot frame spectrogram data.
Parameters
----------
frame : Frame
Frame to plot
xtype : {"fmid", "fmin", "f", "px"}, default: "fmid"
Types of axis labels, particularly the x-axis. "px" puts axes in units
of pixels. The others are all in frequency: "fmid" shows frequencies
relative to the central frequency, "fmin" is relative to the minimum
frequency, and "f" is absolute frequency.
db : bool, default: True
Option to convert intensities to dB
colorbar : bool, default: True
Whether to display colorbar
label : bool, default: False
Option to place target name as a label in plot
minor_ticks : bool, default: False
Option to include minor ticks on both axes
grid : bool, default: False
Option to overplot grid from major ticks
Return
------
p : matplotlib.image.AxesImage
Spectrogram axes object
"""
plots.plot_frame(self, *args, **kwargs)
[docs]
def get_slice(self, l, r):
"""
Slice frame data with left and right index bounds.
Parameters
----------
l : int
Left bound
r : int
Right bound
Returns
-------
s_fr : Frame
Sliced frame
"""
return frame_utils.get_slice(self, l, r)
[docs]
def integrate(self, axis='t', mode='mean', normalize=False):
"""
Integrate along either time ('t', 0) or frequency ('f', 1) axes, to create
spectra or time series data. Mode is either 'mean' or 'sum'.
Parameters
----------
data : Frame, or 2D ndarray
Input frame or Numpy array
axis : int or str
Axis over which to integrate; time ('t', 0) or frequency ('f', 1)
mode : {"mean", "sum"}, default: "mean"
Integration mode
normalize : bool
Whether to normalize integrated array to mean 0, std 1
Returns
-------
output : ndarray
Integrated product
"""
return frame_utils.integrate(self, axis=axis, mode=mode, normalize=normalize)
def _update_waterfall(self, filename=None, max_load=1):
# If entirely synthetic, base filterbank structure on existing sample data
if self.waterfall is None:
my_path = os.path.abspath(os.path.dirname(__file__))
path = os.path.join(my_path, 'assets/sample.fil')
self.waterfall = Waterfall(path, max_load=max_load)
self.waterfall.header['source_name'] = self.source_name
self.waterfall.header['rawdatafile'] = 'Synthetic'
container_attr = {
't_begin': 0,
't_end': self.tchans,
'file_size_bytes': self.tchans * self.fchans * self.waterfall.header['nbits'] / 8,
'n_channels_in_file': self.fchans,
'n_ints_in_file': self.tchans,
'file_shape': (self.tchans, 1, self.fchans),
'f_end': self.fmax * 1e-6,
'f_begin': self.fmin * 1e-6,
'f_stop': self.fmax * 1e-6,
'f_start': self.fmin * 1e-6,
't_start': 0,
't_stop': self.tchans,
'selection_shape': (self.tchans, 1, self.fchans),
'chan_start_idx': 0,
'chan_stop_idx': self.fchans,
}
for key, value in container_attr.items():
setattr(self.waterfall.container,
key,
value)
wat_attr = {
'n_channels_in_file': self.fchans,
'n_ints_in_file': self.tchans,
'file_shape': (self.tchans, 1, self.fchans),
'file_size_bytes': self.tchans * self.fchans * self.waterfall.header['nbits'] / 8,
'selection_shape': (self.tchans, 1, self.fchans),
}
for key, value in wat_attr.items():
setattr(self.waterfall,
key,
value)
# Format data correctly for saving into filterbank format
self.waterfall.data = self.data[:, np.newaxis, :]
if not self.ascending:
# Have to manually flip in the frequency direction
self.waterfall.data = self.waterfall.data[:, :, ::-1]
# Edit header info for Waterfall in case these have been changed from Frame manipulations
header_attr = {
'tsamp': self.dt,
'tstart': self.mjd,
'nchans': self.fchans,
'fch1': self.fch1 * 1e-6,
}
if self.ascending:
header_attr['foff'] = self.df * 1e-6
else:
header_attr['foff'] = self.df * -1e-6
self.waterfall.header.update(header_attr)
self.waterfall.file_header.update(header_attr)
if filename is not None:
if not os.path.isabs(filename):
filename = os.path.abspath(filename)
self.waterfall.container.filename = filename
self.waterfall.container.idx_data = len(sigproc.generate_sigproc_header(self.waterfall))
def _encode_bytestrings(self):
for key in ['source_name', 'rawdatafile']:
# Some data don't have these keys to begin with
if key in self.waterfall.header:
if not isinstance(self.waterfall.header[key], bytes):
self.waterfall.header[key] = self.waterfall.header[key].encode()
def _decode_bytestrings(self):
for key in ['source_name', 'rawdatafile']:
if key in self.waterfall.header:
if isinstance(self.waterfall.header[key], bytes):
self.waterfall.header[key] = self.waterfall.header[key].decode()
[docs]
def get_waterfall(self):
"""
Return current frame as a Waterfall object. Note: some filterbank
metadata may not be accurate anymore, depending on prior frame
manipulations.
"""
self._update_waterfall()
return self.waterfall
[docs]
def check_waterfall(self):
"""
If an associated Waterfall object exists, update and return it. Otherwise,
return None. Useful to chain with ``setigen.Frame.from_data()`` if manipulating
completely synthetic data.
"""
if self.waterfall is None:
return None
else:
return self.get_waterfall()
[docs]
def save_fil(self, filename, max_load=1):
"""
Save frame data as a filterbank file (.fil).
"""
self._update_waterfall(filename=filename, max_load=max_load)
self._encode_bytestrings()
self.waterfall.write_to_fil(filename)
self._decode_bytestrings()
[docs]
def save_hdf5(self, filename, max_load=1):
"""
Save frame data as an HDF5 file.
"""
self._update_waterfall(filename=filename, max_load=max_load)
self._encode_bytestrings()
self.waterfall.write_to_hdf5(filename)
self._decode_bytestrings()
[docs]
def save_h5(self, filename, max_load=1):
"""
Save frame data as an HDF5 file.
"""
self.save_hdf5(filename, max_load=max_load)
[docs]
def save_npy(self, filename):
"""
Save frame data as an .npy file.
"""
np.save(filename, self.data)
[docs]
def load_npy(self, filename):
"""
Load frame data from a .npy file.
"""
self.data = np.load(filename)
[docs]
def save_pickle(self, filename):
"""
Save entire frame as a pickled file (.pickle).
"""
with open(filename, 'wb') as f:
pickle.dump(self, f)
[docs]
@classmethod
def load_pickle(cls, filename):
"""
Load Frame object from a pickled file (.pickle), created with
:func:`~setigen.frame.Frame.save_pickle`.
"""
with open(filename, 'rb') as f:
frame = pickle.load(f)
return frame
[docs]
def params_from_backend(obs_length=300,
sample_rate=3e9,
num_branches=1024,
fftlength=1048576,
int_factor=51):
"""
Return frame parameters calculated from data backend characteristics.
Parameters
----------
obs_length : float, optional
Length of observation in seconds
sample_rate : float, optional
Physical sample rate, in Hz, for collecting real voltage data
num_branches : int, optional
Number of PFB branches. Note that this corresponds to
``num_branches / 2`` coarse channels.
fftlength : int, optional
FFT length to be used in fine channelization
int_factor : int, optional
Integration factor used in fine channelization. Determines ``tchans``.
Returns
-------
param_dict : dict
Dictionary of parameters
"""
chan_bw = sample_rate / num_branches
df = chan_bw / fftlength
dt = int_factor / df
tchans = int(obs_length / dt)
param_dict = {
'tchans': tchans,
'df': df,
'dt': dt
}
return param_dict