Source code for setigen.voltage.antenna

from __future__ import annotations

import numpy as np

from astropy import units as u
from typing import Any

from ._array_backend import xp
from ._antenna.array_ops import (
    _apply_background_to_antenna,
    _collect_array_samples,
    _coerce_delays,
    _get_single_antenna_samples,
    _populate_background_streams,
    _reset_array_time_state,
    _set_antenna_time,
)
from setigen._typing import SeedLike
from ._antenna.construction import (
    _build_antennas,
    _build_background_streams,
    _build_polarization_streams,
    _coerce_fch1,
    _coerce_sample_rate,
    _validate_num_pols,
)


[docs] class Antenna(object): """Model a radio antenna with one or two polarization streams."""
[docs] def __init__(self, sample_rate: float | u.Quantity = 3*u.GHz, fch1: float | u.Quantity = 0*u.GHz, ascending: bool = True, num_pols: int = 2, t_start: float = 0, seed: SeedLike = None, **kwargs: Any) -> None: """Initialize an antenna and its polarization data streams. Args: sample_rate: Real-voltage sample rate. fch1: Frequency of the first coarse channel. ascending: Whether the frequency axis is ascending. num_pols: Number of polarizations, either one or two. t_start: Start time in seconds. seed: Random seed or generator. **kwargs: Reserved keyword arguments. """ self.rng = xp.random.default_rng(seed) self.sample_rate = _coerce_sample_rate(sample_rate) self.dt = 1 / self.sample_rate self.fch1 = _coerce_fch1(fch1) self.ascending = ascending self.num_pols = _validate_num_pols(num_pols) self.t_start = t_start self.start_obs = True self.x, self.y, self.streams = _build_polarization_streams(sample_rate=self.sample_rate, fch1=self.fch1, ascending=self.ascending, t_start=self.t_start, num_pols=self.num_pols, rng=self.rng) self.delay = None self.bg_cache = [None, None]
[docs] def set_time(self, t: float) -> None: """Set the antenna start time for the next sample request. Args: t: New start time in seconds. """ _set_antenna_time(self, t)
[docs] def add_time(self, t: float) -> None: """Advance the antenna start time. Args: t: Time increment in seconds. """ self.set_time(self.t_start + t)
[docs] def reset_start(self) -> None: """Reset the observation-start state for the antenna.""" self.add_time(0)
[docs] def get_samples(self, num_samples: int) -> np.ndarray: """Retrieve voltage samples from each polarization. Args: num_samples: Number of samples to retrieve. Returns: Voltage samples of shape `(1, num_pols, num_samples)`. """ return _get_single_antenna_samples(self, num_samples, xp=xp)
[docs] class MultiAntennaArray(object): """Model an antenna array with per-antenna delays and shared background noise."""
[docs] def __init__(self, num_antennas: int, sample_rate: float | u.Quantity = 3*u.GHz, fch1: float | u.Quantity = 0*u.GHz, ascending: bool = True, num_pols: int = 2, delays: list[int] | tuple[int, ...] | None = None, t_start: float = 0, seed: SeedLike = None, **kwargs: Any) -> None: """Initialize a multi-antenna array. Args: num_antennas: Number of antennas in the array. sample_rate: Real-voltage sample rate. fch1: Frequency of the first coarse channel. ascending: Whether the frequency axis is ascending. num_pols: Number of polarizations, either one or two. delays: Optional per-antenna integer-sample delays. t_start: Start time in seconds. seed: Random seed or generator. **kwargs: Reserved keyword arguments. """ self.rng = xp.random.default_rng(seed) self.delays, self.max_delay = _coerce_delays(delays=delays, num_antennas=num_antennas, xp=xp) self.num_antennas = num_antennas self.sample_rate = _coerce_sample_rate(sample_rate) self.dt = 1 / self.sample_rate self.fch1 = _coerce_fch1(fch1) self.ascending = ascending self.num_pols = _validate_num_pols(num_pols) self.t_start = t_start self.start_obs = True self.antennas = _build_antennas(num_antennas=self.num_antennas, sample_rate=self.sample_rate, fch1=self.fch1, ascending=self.ascending, num_pols=self.num_pols, t_start=self.t_start, rng=self.rng, antenna_cls=Antenna, delays=self.delays) # Create background data streams and link relevant antenna data streams for tracking noise self.bg_x, self.bg_y, self.bg_streams = _build_background_streams(sample_rate=self.sample_rate, fch1=self.fch1, ascending=self.ascending, t_start=self.t_start, num_pols=self.num_pols, rng=self.rng, antennas=self.antennas)
[docs] def set_time(self, t: float) -> None: """Set the array start time for the next sample request. Args: t: New start time in seconds. """ _reset_array_time_state(self, t)
[docs] def add_time(self, t: float) -> None: """Advance the array start time. Args: t: Time increment in seconds. """ self.set_time(self.t_start + t)
[docs] def reset_start(self) -> None: """Reset the observation-start state for the array.""" self.add_time(0)
[docs] def get_samples(self, num_samples: int) -> xp.ndarray: """Retrieve voltage samples from each antenna and polarization. Args: num_samples: Number of samples to retrieve. Returns: Voltage samples of shape `(num_antennas, num_pols, num_samples)`. Raises: ValueError: If `num_samples` does not exceed the maximum antenna delay. """ if num_samples <= self.max_delay: raise ValueError("num_samples must be greater than the maximum antenna delay") bg_num_samples = _populate_background_streams(self, num_samples) # For each antenna, get samples from each pol data stream, adding background contributions # and caching voltages to account for varying antenna delays for antenna in self.antennas: _apply_background_to_antenna(self, antenna, bg_num_samples=bg_num_samples, num_samples=num_samples, xp=xp) self.t_start += num_samples * self.dt self.start_obs = False return _collect_array_samples(self, xp=xp)