from __future__ import annotations
import copy
import pickle
from typing import Any
import numpy as np
from astropy import units as u
from astropy.time import Time
from astropy.stats import sigma_clip
from . import unit_utils
from . import slice
from . import plots
from . import utils
from ._typing import BandpassProfileInput, FrequencyPathInput, FrequencyProfile, PathLike, SeedLike, TimeProfileInput
from ._frame.construction import (
_attach_loaded_waterfall,
_initialize_frame_from_spec,
_normalize_frame_init,
)
from ._frame.models import (
_ConstantSignalConfig,
_NoiseConfig,
_SampledNoiseConfig,
_build_constant_signal_kwargs,
_generate_noise,
_generate_sampled_noise,
)
from ._frame.io import (
_decode_bytestrings,
_encode_bytestrings,
_update_waterfall,
)
from ._frame.signal import (
_finalize_signal,
_get_restricted_fs,
_normalize_bp_profile,
_normalize_path,
_normalize_t_profile,
_render_signal,
_resolve_bounding_indices,
)
[docs]
class Frame(object):
"""Represent synthetic or waterfall-backed SETI spectrogram data."""
[docs]
def __init__(self,
waterfall: Any = None,
fchans: int | None = None,
tchans: int | None = None,
df: Any = 2.7939677238464355*u.Hz,
dt: Any = 18.253611008*u.s,
fch1: Any = 6*u.GHz,
ascending: bool = False,
data: np.ndarray | None = None,
seed: SeedLike = None,
**kwargs: Any) -> None:
"""Initialize a frame from synthetic dimensions or existing data.
Args:
waterfall: Waterfall object or path to a `.fil` or `.h5` file.
fchans: Number of frequency channels for synthetic initialization.
tchans: Number of time channels for synthetic initialization.
df: Frequency resolution.
dt: Time resolution.
fch1: Frequency of the first channel.
ascending: Whether the frequency axis should be ascending.
data: Optional preloaded frame data.
seed: Random seed or generator.
**kwargs: Additional construction keywords such as `shape`, `mjd`,
`t_start`, `source_name`, `f_start`, or `f_stop`.
"""
self.rng = np.random.default_rng(seed)
_initialize_frame_from_spec(
self,
_normalize_frame_init(waterfall=waterfall,
fchans=fchans,
tchans=tchans,
df=df,
dt=dt,
fch1=fch1,
ascending=ascending,
data=data,
kwargs=kwargs),
)
# Degrees of freedom for chi-squared radiometer noise
# 2 polarizations, real and imaginary components -> 4
self.chi2_df = 4 * round(self.df * self.dt)
# Calculate unit drift rate (pixel over pixel drift)
self.unit_drift_rate = self.df / self.dt
# Shared creation of ranges
self._update_fs()
self._update_ts()
# No matter what, self.data will be populated at this point.
self._update_noise_frame_stats()
# Placeholder dictionary for user metadata, just for bookkeeping purposes
self.metadata = self.get_params()
[docs]
@classmethod
def from_data(
cls,
df: Any,
dt: Any,
fch1: Any,
ascending: bool,
data: np.ndarray,
metadata: dict[str, Any] | None = None,
waterfall: Any = None,
seed: SeedLike = None,
) -> "Frame":
"""Build a frame directly from an in-memory data array.
Args:
df: Frequency resolution.
dt: Time resolution.
fch1: Frequency of the first channel.
ascending: Whether the frequency axis is ascending.
data: Preloaded frame data.
metadata: Optional metadata to attach to the frame.
waterfall: Optional associated waterfall object.
seed: Random seed or generator.
Returns:
Frame populated with the supplied data.
"""
tchans, fchans = data.shape
frame = cls(fchans=fchans,
tchans=tchans,
df=df,
dt=dt,
fch1=fch1,
ascending=ascending,
data=data,
seed=seed)
if metadata is not None:
frame.add_metadata(dict(metadata))
_attach_loaded_waterfall(frame, waterfall)
return frame
[docs]
@classmethod
def from_waterfall(cls, waterfall: Any, seed: SeedLike = None) -> "Frame":
"""Build a frame from a waterfall-backed observation.
Args:
waterfall: Waterfall object or path to a supported file.
seed: Random seed or generator.
Returns:
Frame loaded from the supplied waterfall.
"""
return cls(waterfall=waterfall, seed=seed)
[docs]
@classmethod
def from_backend_params(cls,
fchans: int | None = None,
obs_length: float = 300,
sample_rate: float = 3e9,
num_branches: int = 1024,
fftlength: int = 1048576,
int_factor: int = 51,
fch1: Any = 6*u.GHz,
ascending: bool = False,
data: np.ndarray | None = None,
seed: SeedLike = None) -> "Frame":
"""Build a frame from backend-like observing parameters.
Args:
fchans: Number of frequency channels. Required when `data` is not
supplied.
obs_length: Observation length in seconds.
sample_rate: Real-voltage sample rate in Hz.
num_branches: Number of PFB branches.
fftlength: Fine-channel FFT length.
int_factor: Fine-channel integration factor.
fch1: Frequency of the first channel.
ascending: Whether the frequency axis is ascending.
data: Optional preloaded frame data.
seed: Random seed or generator.
Returns:
Frame with dimensions implied by the backend parameters.
Raises:
ValueError: If neither `fchans` nor `data` is supplied, or if the
supplied data shape is inconsistent with the backend parameters.
"""
if data is not None:
tchans, fchans = data.shape
elif fchans is None:
raise ValueError("Value not given for fchans")
param_dict = params_from_backend(obs_length=obs_length,
sample_rate=sample_rate,
num_branches=num_branches,
fftlength=fftlength,
int_factor=int_factor)
if data is not None:
if param_dict['tchans'] != tchans:
raise ValueError(
f"Data has {tchans} time samples, but backend parameters imply {param_dict['tchans']}."
)
frame = cls(fchans=fchans,
**param_dict,
fch1=fch1,
ascending=ascending,
data=data,
seed=seed)
return frame
[docs]
def copy(self) -> "Frame":
"""Return a deep copy of the frame.
Returns:
Independent frame copy.
"""
c_frame = copy.deepcopy(self)
# Note that since the __getstate__ function is overwritten, we need to
# add back the waterfall object.
waterfall = self.get_waterfall()
if waterfall is not None:
c_frame.waterfall = copy.deepcopy(waterfall)
return c_frame
def __getstate__(self) -> dict[str, Any]:
# Exclude waterfall Waterfall object from pickle, since it uses open threads, which
# can't be pickled -- note that this affects copy!
state = self.__dict__.copy()
state['waterfall'] = None
return state
def _update_fs(self) -> None:
"""Update the frame frequency axis and derived bounds."""
# Normally, self.ascending will be False; filterbank convention is decreasing freqs
if self.ascending:
self.fmin = self.fch1
self.fs = np.linspace(self.fmin,
self.fmin + self.fchans * self.df,
self.fchans,
endpoint=False)
self.fmax = self.fs[-1]
else:
self.fmax = self.fch1
self.fs = np.linspace(self.fmax,
self.fmax - self.fchans * self.df,
self.fchans,
endpoint=False)
self.fmin = self.fs[-1]
self.fs = self.fs[::-1]
def _update_ts(self) -> None:
"""Update the frame time axis."""
self.ts = unit_utils.get_value(np.linspace(0,
self.tchans * self.dt,
self.tchans,
endpoint=False),
u.s)
@property
def fmid(self) -> float:
"""Return the midpoint frequency of the frame."""
return (self.fmin + self.fmax) / 2
@property
def mjd(self) -> float:
"""Return the frame start time in Modified Julian Date."""
return Time(self.t_start, format='unix').mjd
@property
def t_stop(self) -> float:
"""Return the observation stop time in Unix seconds."""
return self.t_start + self.tchans * self.dt
@property
def obs_length(self) -> float:
"""Return the total observation length in seconds."""
return self.tchans * self.dt
@property
def ts_ext(self) -> np.ndarray:
"""Return the time axis extended by one final endpoint sample."""
return np.append(self.ts, self.ts[-1] + self.dt)
@property
def mean(self) -> float:
"""Return the mean intensity of the frame."""
return np.mean(self.data)
@property
def std(self) -> float:
"""Return the standard deviation of the frame."""
return np.std(self.data)
[docs]
def get_total_stats(self) -> tuple[float, float]:
"""Return the mean and standard deviation of the full frame.
Returns:
Mean and standard deviation of the frame data.
"""
return self.mean, self.std
[docs]
def get_noise_stats(self) -> tuple[float, float]:
"""Return the sigma-clipped noise statistics for the frame.
Returns:
Sigma-clipped noise mean and standard deviation.
"""
return self.noise_mean, self.noise_std
def _update_noise_frame_stats(self) -> None:
"""Update sigma-clipped noise statistics for the frame."""
clipped_data = sigma_clip(self.data,
sigma=3,
maxiters=5,
masked=False)
self.noise_mean = np.mean(clipped_data)
self.noise_std = np.std(clipped_data)
[docs]
def zero_data(self) -> None:
"""Reset frame data and cached noise statistics to zero."""
self.data = np.zeros(self.shape)
self.noise_mean = self.noise_std = 0
[docs]
def add_noise(self,
x_mean: float,
x_std: float | None = None,
x_min: float | None = None,
noise_type: str = 'chi2') -> np.ndarray:
"""Add synthetic radiometer or Gaussian noise to the frame.
Args:
x_mean: Target mean intensity.
x_std: Target standard deviation for Gaussian noise.
x_min: Optional lower bound for truncated Gaussian noise.
noise_type: Noise distribution selector.
Returns:
Synthetic noise array added to the frame.
Raises:
ValueError: If Gaussian noise is requested without `x_std`.
"""
noise, x_mean, x_std = _generate_noise(
_NoiseConfig.from_values(x_mean=x_mean,
x_std=x_std,
x_min=x_min,
noise_type=noise_type),
chi2_df=self.chi2_df,
shape=self.shape,
rng=self.rng,
)
self.data += noise
set_to_param = (self.noise_mean == self.noise_std == 0)
if set_to_param:
self.noise_mean, self.noise_std = x_mean, x_std
else:
self._update_noise_frame_stats()
return noise
[docs]
def add_noise_from_obs(self,
x_mean_array: np.ndarray | None = None,
x_std_array: np.ndarray | None = None,
x_min_array: np.ndarray | None = None,
share_index: bool = True,
noise_type: str = 'chi2') -> np.ndarray:
"""Add synthetic noise by sampling empirical observation parameters.
Args:
x_mean_array: Candidate noise means.
x_std_array: Candidate noise standard deviations.
x_min_array: Candidate truncated-Gaussian minima.
share_index: Whether to sample correlated parameters by shared index.
noise_type: Noise distribution selector.
Returns:
Synthetic noise array added to the frame.
Raises:
IndexError: If shared-index sampling is requested for mismatched
parameter arrays.
"""
noise, x_mean, x_std = _generate_sampled_noise(
_SampledNoiseConfig.from_values(x_mean_array=x_mean_array,
x_std_array=x_std_array,
x_min_array=x_min_array,
share_index=share_index,
noise_type=noise_type),
dt=self.dt,
chi2_df=self.chi2_df,
shape=self.shape,
rng=self.rng,
)
self.data += noise
set_to_param = (self.noise_mean == self.noise_std == 0)
if set_to_param:
self.noise_mean, self.noise_std = x_mean, x_std
else:
self._update_noise_frame_stats()
return noise
[docs]
def add_signal(self,
path: FrequencyPathInput,
t_profile: TimeProfileInput,
f_profile: FrequencyProfile,
bp_profile: BandpassProfileInput | None = None,
bounding_f_range: tuple[Any, Any] | None = None,
integrate_path: bool = False,
integrate_t_profile: bool = False,
integrate_f_profile: bool = False,
doppler_smearing: bool = False,
t_subsamples: int = 10,
f_subsamples: int = 10,
smearing_subsamples: int = 10) -> np.ndarray:
"""Add a synthetic signal to the frame.
Args:
path: Signal path in time-frequency space.
t_profile: Time-intensity profile.
f_profile: Frequency profile callable.
bp_profile: Optional bandpass profile.
bounding_f_range: Optional bounding frequency range for rendering.
integrate_path: Whether to oversample and average the path in time.
integrate_t_profile: Whether to oversample and average the time
profile.
integrate_f_profile: Whether to oversample and average the frequency
profile.
doppler_smearing: Whether to numerically smear power across
frequency bins.
t_subsamples: Number of time subsamples per bin.
f_subsamples: Number of frequency subsamples per bin.
smearing_subsamples: Number of substeps used for Doppler smearing.
Returns:
Two-dimensional signal array that was added to the frame.
"""
bounding_min, bounding_max = _resolve_bounding_indices(self, bounding_f_range)
restricted_fs, restricted_fchans = _get_restricted_fs(
self,
bounding_min=bounding_min,
bounding_max=bounding_max,
integrate_f_profile=integrate_f_profile,
f_subsamples=f_subsamples,
)
ff, _ = np.meshgrid(restricted_fs, self.ts)
t_profile_tt = _normalize_t_profile(self,
restricted_fs,
t_profile,
integrate_t_profile=integrate_t_profile,
t_subsamples=t_subsamples)
resolved_path = _normalize_path(self,
restricted_fs,
path,
integrate_path=integrate_path,
doppler_smearing=doppler_smearing,
t_subsamples=t_subsamples,
smearing_subsamples=smearing_subsamples)
bp_profile_ff = _normalize_bp_profile(self, restricted_fs, bp_profile)
signal = _render_signal(ff=ff,
t_profile_tt=t_profile_tt,
f_profile=f_profile,
bp_profile_ff=bp_profile_ff,
path_tt=resolved_path.path_tt,
doppler_smearing=doppler_smearing,
dpath_tt=resolved_path.dpath_tt,
smearing_subsamples=smearing_subsamples)
return _finalize_signal(self,
signal=signal,
bounding_min=bounding_min,
bounding_max=bounding_max,
integrate_f_profile=integrate_f_profile,
restricted_fchans=restricted_fchans,
f_subsamples=f_subsamples)
[docs]
def add_constant_signal(self,
f_start: Any,
drift_rate: Any,
level: float,
width: Any,
f_profile_type: str = 'sinc2',
doppler_smearing: bool = False) -> np.ndarray:
"""Add a constant-intensity, constant-drift signal to the frame.
Args:
f_start: Starting signal frequency.
drift_rate: Signal drift rate.
level: Signal intensity.
width: Signal width.
f_profile_type: Spectral profile selector.
doppler_smearing: Whether to numerically smear power across bins.
Returns:
Two-dimensional signal array that was added to the frame.
"""
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
width = unit_utils.get_value(width, u.Hz)
return self.add_signal(**_build_constant_signal_kwargs(
self,
_ConstantSignalConfig.from_values(f_start=f_start,
drift_rate=drift_rate,
level=level,
width=width,
f_profile_type=f_profile_type,
doppler_smearing=doppler_smearing),
))
[docs]
def get_index(self, frequency: Any) -> np.ndarray:
"""Convert frequency to the closest channel index.
Args:
frequency: Frequency or array of frequencies to convert.
Returns:
Closest frame index or indices.
"""
return np.round((unit_utils.get_value(frequency, u.Hz) - self.fmin) / self.df).astype(int)
[docs]
def get_frequency(self, index: int | np.ndarray) -> float | np.ndarray:
"""Convert a frame index into frequency.
Args:
index: Frame index or indices.
Returns:
Frequency value or array in Hz.
"""
return self.fmin + self.df * index
[docs]
def get_intensity(self, snr: float) -> float:
"""Calculate signal intensity from SNR using the frame noise estimate.
Args:
snr: Desired signal-to-noise ratio.
Returns:
Signal intensity that corresponds to the requested SNR.
Raises:
ValueError: If the frame does not yet contain measurable noise.
"""
if self.noise_std == 0:
raise ValueError('You must add noise in the image to specify SNR!')
return snr * self.noise_std / np.sqrt(self.tchans)
[docs]
def get_snr(self, intensity: float) -> float:
"""Calculate SNR from signal intensity using the frame noise estimate.
Args:
intensity: Signal intensity.
Returns:
Signal-to-noise ratio for the supplied intensity.
Raises:
ValueError: If the frame does not yet contain measurable noise.
"""
if self.noise_std == 0:
raise ValueError('You must add noise in the image to return SNR!')
return intensity * np.sqrt(self.tchans) / self.noise_std
[docs]
def get_drift_rate(self, start_index: int, stop_index: int) -> float:
"""Calculate drift rate from pixel coordinates.
Args:
start_index: Starting frequency index.
stop_index: Ending frequency index.
Returns:
Drift rate in Hz/s.
"""
return (stop_index - start_index) * self.df / (self.tchans * self.dt)
[docs]
def get_info(self) -> dict[str, Any]:
"""Return the full frame attribute dictionary.
Returns:
Full frame attribute dictionary.
"""
return vars(self)
[docs]
def get_params(self) -> dict[str, Any]:
"""Return the core frame parameters.
Returns:
Dictionary of primary frame parameters.
"""
return {
'fchans': self.fchans,
'tchans': self.tchans,
'df': self.df,
'dt': self.dt,
'fch1': self.fch1,
'ascending': self.ascending
}
[docs]
def get_data(self, db: bool = False) -> np.ndarray:
"""Return frame data in linear or decibel units.
Args:
db: Whether to convert intensities to dB before returning them.
Returns:
Frame data array.
"""
if db:
return 10 * np.log10(self.data)
return self.data
[docs]
@utils._copy_docstring(plots.plot_frame)
def plot(self, *args: Any, **kwargs: Any) -> Any:
return plots.plot_frame(self, *args, **kwargs)
[docs]
@utils._copy_docstring(slice.get_slice)
def get_slice(self, *args: Any, **kwargs: Any) -> Any:
return slice.get_slice(self, *args, **kwargs)
# @utils._copy_docstring(frame_utils.integrate)
# def integrate(self, *args, **kwargs):
# return frame_utils.integrate(self, *args, **kwargs)
[docs]
def get_waterfall(self) -> Any:
"""Return the current frame as an updated waterfall object.
Returns:
Waterfall representation of the frame.
"""
_update_waterfall(self)
return self.waterfall
[docs]
def check_waterfall(self) -> Any:
"""Return the updated attached waterfall when one exists.
Returns:
Updated waterfall object or `None` when the frame has no attached
waterfall.
"""
if self.waterfall is None:
return None
return self.get_waterfall()
[docs]
def save_fil(self, filename: PathLike, max_load: int = 1) -> None:
"""Save frame data as a SIGPROC filterbank file.
Args:
filename: Output `.fil` path.
max_load: Maximum load parameter for a lazily created waterfall.
"""
_update_waterfall(self, filename=filename, max_load=max_load)
_encode_bytestrings(self)
self.waterfall.write_to_fil(filename)
_decode_bytestrings(self)
[docs]
def save_hdf5(self, filename: PathLike, max_load: int = 1) -> None:
"""Save frame data as an HDF5 waterfall file.
Args:
filename: Output `.h5` path.
max_load: Maximum load parameter for a lazily created waterfall.
"""
_update_waterfall(self, filename=filename, max_load=max_load)
_encode_bytestrings(self)
self.waterfall.write_to_hdf5(filename)
_decode_bytestrings(self)
[docs]
def save_h5(self, filename: PathLike, max_load: int = 1) -> None:
"""Save frame data as an HDF5 waterfall file.
Args:
filename: Output `.h5` path.
max_load: Maximum load parameter for a lazily created waterfall.
"""
self.save_hdf5(filename, max_load=max_load)
[docs]
def save_npy(self, filename: PathLike) -> None:
"""Save frame data as a NumPy binary file.
Args:
filename: Output `.npy` path.
"""
np.save(filename, self.data)
[docs]
def load_npy(self, filename: PathLike) -> None:
"""Load frame data from a NumPy binary file.
Args:
filename: Input `.npy` path.
"""
self.data = np.load(filename)
[docs]
def save_pickle(self, filename: PathLike) -> None:
"""Serialize the full frame with pickle.
Args:
filename: Output pickle path.
"""
with open(filename, "wb") as f:
pickle.dump(self, f)
[docs]
@classmethod
def load_pickle(cls, filename: PathLike) -> "Frame":
"""Load a frame from a pickled file.
Args:
filename: Input pickle path created by `save_pickle()`.
Returns:
Deserialized frame object.
"""
with open(filename, "rb") as f:
return pickle.load(f)
[docs]
def params_from_backend(obs_length: float = 300,
sample_rate: float = 3e9,
num_branches: int = 1024,
fftlength: int = 1048576,
int_factor: int = 51) -> dict[str, float]:
"""Return frame parameters implied by backend characteristics.
Args:
obs_length: Observation length in seconds.
sample_rate: Real-voltage sample rate in Hz.
num_branches: Number of PFB branches.
fftlength: Fine-channel FFT length.
int_factor: Fine-channel integration factor.
Returns:
Dictionary containing `tchans`, `df`, and `dt`.
"""
chan_bw = sample_rate / num_branches
df = chan_bw / fftlength
dt = int_factor / df
tchans = int(obs_length / dt)
return {
'tchans': tchans,
'df': df,
'dt': dt
}