"""
Sample signal paths for signal injection.
For any given starting frequency,
these functions map out the path of a signal as a function of time in
time-frequency space.
"""
from __future__ import annotations
from enum import Enum
import numpy as np
from astropy import units as u
from setigen import unit_utils
from setigen._typing import FrequencyPath, SeedLike
[docs]
class SpreadType(str, Enum):
"""Supported spread distributions for simple RFI paths."""
UNIFORM = "uniform"
NORMAL = "normal"
[docs]
class RfiType(str, Enum):
"""Supported RFI path families."""
STATIONARY = "stationary"
RANDOM_WALK = "random_walk"
def _coerce_spread_type(spread_type: str | SpreadType) -> SpreadType:
"""Normalize a user-supplied spread type.
Args:
spread_type: Raw spread-type selector.
Returns:
Normalized spread-type enum value.
Raises:
ValueError: If the spread type is unsupported.
"""
if isinstance(spread_type, SpreadType):
return spread_type
try:
return SpreadType(spread_type)
except ValueError as exc:
raise ValueError(f"'{spread_type}' is not a valid spread type!") from exc
def _coerce_rfi_type(rfi_type: str | RfiType) -> RfiType:
"""Normalize a user-supplied RFI type.
Args:
rfi_type: Raw RFI-type selector.
Returns:
Normalized RFI-type enum value.
"""
if isinstance(rfi_type, RfiType):
return rfi_type
if rfi_type == RfiType.RANDOM_WALK.value:
return RfiType.RANDOM_WALK
return RfiType.STATIONARY
[docs]
def constant_path(f_start: float | u.Quantity, drift_rate: float | u.Quantity) -> FrequencyPath:
"""Return a constant-drift path.
Args:
f_start: Starting center frequency.
drift_rate: Doppler drift rate.
Returns:
Path callable.
"""
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
def path(t):
return f_start + drift_rate * t
return path
[docs]
def squared_path(f_start: float | u.Quantity, drift_rate: float | u.Quantity) -> FrequencyPath:
"""Return a quadratic drift path.
Args:
f_start: Starting center frequency.
drift_rate: Initial drift-rate slope.
Returns:
Path callable.
"""
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
def path(t):
return f_start + 0.5 * drift_rate * t**2
return path
[docs]
def sine_path(
f_start: float | u.Quantity,
drift_rate: float | u.Quantity,
period: float | u.Quantity,
amplitude: float | u.Quantity,
) -> FrequencyPath:
"""Return a sinusoidally modulated drift path.
Args:
f_start: Starting center frequency.
drift_rate: Doppler drift rate.
period: Modulation period.
amplitude: Modulation amplitude.
Returns:
Path callable.
"""
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
period = unit_utils.get_value(period, u.s)
amplitude = unit_utils.get_value(amplitude, u.Hz)
def path(t):
return f_start + amplitude * np.sin(2*np.pi*t/period) + drift_rate * t
return path
[docs]
def simple_rfi_path(
f_start: float | u.Quantity,
drift_rate: float | u.Quantity,
spread: float | u.Quantity,
spread_type: str | SpreadType = 'uniform',
rfi_type: str | RfiType = 'stationary',
seed: SeedLike = None,
) -> FrequencyPath:
"""Return a simple randomized RFI path.
Args:
f_start: Starting center frequency.
drift_rate: Doppler drift rate.
spread: Width of frequency variations.
spread_type: Distribution used for per-time-step offsets.
rfi_type: Whether offsets are stationary or cumulative.
seed: Random seed or generator.
Returns:
Path callable.
"""
rng = np.random.default_rng(seed)
f_start = unit_utils.get_value(f_start, u.Hz)
drift_rate = unit_utils.get_value(drift_rate, u.Hz / u.s)
spread = unit_utils.get_value(spread, u.Hz)
resolved_spread_type = _coerce_spread_type(spread_type)
resolved_rfi_type = _coerce_rfi_type(rfi_type)
def path(t):
if resolved_spread_type is SpreadType.UNIFORM:
f_offset = rng.uniform(-spread / 2., spread / 2., size=t.shape)
else:
factor = 2 * np.sqrt(2 * np.log(2))
f_offset = rng.normal(0, spread / factor, size=t.shape)
if resolved_rfi_type is RfiType.RANDOM_WALK:
f_offset = np.cumsum(f_offset)
return f_start + drift_rate * t + f_offset
return path