Source code for setigen.cadence

from __future__ import annotations

import collections
import numpy as np
import pickle
from typing import Any, Callable, Iterable

from ._constants import ORDER_LABEL_METADATA_KEY
from . import frame as _frame
from . import plots
from . import utils
from ._typing import PathLike

_CADENCE_COMPATIBILITY_ATTRS = ("df", "dt", "fchans", "fmin")


[docs] class Cadence(collections.abc.MutableSequence): """Organize a sequence of compatible `Frame` objects into a cadence."""
[docs] def __init__(self, frame_list: Iterable[_frame.Frame] | None = None, t_slew: float = 0, t_overwrite: bool = False) -> None: """Initialize a cadence. Args: frame_list: Optional iterable of frames to include. t_slew: Slew time between frames in seconds. t_overwrite: Whether to overwrite frame start times to enforce slew spacing. """ self.frames = list() # Insert all initialized items, performing type _checks if not frame_list is None: self.extend(frame_list) self.t_slew = t_slew self.t_overwrite = t_overwrite if t_overwrite: self.overwrite_times()
@property def t_start(self) -> float | None: """Return the cadence start time in Unix seconds.""" if len(self.frames) == 0: return None return self.frames[0].t_start @property def fch1(self) -> float | None: """Return the first-channel frequency shared by the cadence.""" if len(self.frames) == 0: return None return self.frames[0].fch1 @property def ascending(self) -> bool | None: """Return whether cadence frequencies are ascending.""" if len(self.frames) == 0: return None return self.frames[0].ascending @property def fmin(self) -> float | None: """Return the minimum cadence frequency.""" if len(self.frames) == 0: return None return self.frames[0].fmin @property def fmax(self) -> float | None: """Return the maximum cadence frequency.""" if len(self.frames) == 0: return None return self.frames[0].fmax @property def fmid(self) -> float | None: """Return the midpoint cadence frequency.""" if len(self.frames) == 0: return None return self.frames[0].fmid @property def df(self) -> float | None: """Return the shared cadence frequency resolution.""" if len(self.frames) == 0: return None return self.frames[0].df @property def dt(self) -> float | None: """Return the shared cadence time resolution.""" if len(self.frames) == 0: return None return self.frames[0].dt @property def fchans(self) -> int | None: """Return the number of frequency channels in each frame.""" if len(self.frames) == 0: return None return self.frames[0].fchans @property def tchans(self) -> int | None: """Return the total number of time channels across the cadence.""" if len(self.frames) == 0: return None return sum([frame.tchans for frame in self.frames]) @property def obs_range(self) -> float | None: """Return the total observed time span covered by the cadence.""" if len(self.frames) == 0: return None return self.frames[-1].t_stop - self.frames[0].t_start def _check(self, v: _frame.Frame) -> None: """Validate that a frame is cadence-compatible. Args: v: Frame to validate. Raises: TypeError: If the object is not a frame. AttributeError: If the frame is incompatible with existing cadence members. """ if not isinstance(v, _frame.Frame): raise TypeError(f"{v} is not a Frame object.") if len(self.frames) > 0: for attr in _CADENCE_COMPATIBILITY_ATTRS: if getattr(v, attr) != getattr(self.frames[0], attr): raise AttributeError(f"{attr}={getattr(v, attr)} does not match cadence ({getattr(self.frames[0], attr)})") def __len__(self) -> int: return len(self.frames) def __getitem__(self, i: Any) -> _frame.Frame | "Cadence": if isinstance(i, slice): return self.__class__(self.frames[i]) elif isinstance(i, (list, np.ndarray, tuple)): return self.__class__(np.array(self.frames)[i]) else: return self.frames[i] def __delitem__(self, i: int) -> None: del self.frames[i] def __setitem__(self, i: int, v: _frame.Frame) -> None: self._check(v) self.frames[i] = v
[docs] def insert(self, i: int, v: _frame.Frame) -> None: """Insert a compatible frame into the cadence. Args: i: Insertion index. v: Frame to insert. """ self._check(v) self.frames.insert(i, v)
def __str__(self) -> str: return str(self.frames)
[docs] def overwrite_times(self) -> None: """Overwrite frame start times using the configured slew spacing.""" for i, frame in enumerate(self.frames[1:]): frame.t_start = self.frames[i].t_stop + self.t_slew
@property def slew_times(self) -> np.ndarray: """Return slew times between consecutive frames.""" return np.array([self.frames[i].t_start - self.frames[i - 1].t_stop for i in range(1, len(self.frames))])
[docs] def add_signal(self, *args: Any, **kwargs: Any) -> None: """Add the same signal specification to every frame in the cadence. Args: *args: Positional arguments forwarded to `Frame.add_signal()`. **kwargs: Keyword arguments forwarded to `Frame.add_signal()`. """ for frame in self.frames: frame.ts += frame.t_start - self.t_start frame.add_signal(*args, **kwargs) frame.ts -= frame.t_start - self.t_start
[docs] def apply(self, func: Callable[[_frame.Frame], Any]) -> list[Any]: """Apply a function to each frame in the cadence. Args: func: Callable that accepts a frame. Returns: Results returned from each frame application. """ return [func(frame) for frame in self.frames]
[docs] @utils._copy_docstring(plots.plot_cadence) def plot(self, *args: Any, **kwargs: Any) -> Any: return plots.plot_cadence(self, *args, **kwargs)
[docs] def consolidate(self) -> _frame.Frame | None: """Concatenate the cadence into a single frame. Returns: Consolidated frame, or `None` when the cadence is empty. """ if len(self.frames) == 0: return None c_frame = _frame.Frame(fchans=self.fchans, tchans=self.tchans, df=self.df, dt=self.dt, fch1=self.fch1, ascending=self.ascending, t_start=self.t_start) c_frame.data = np.concatenate([frame.data for frame in self.frames], axis=0) c_frame.ts = np.concatenate([frame.ts + frame.t_start for frame in self.frames], axis=0) return c_frame
[docs] def save_pickle(self, filename: PathLike) -> None: """Serialize the full cadence with pickle. Args: filename: Output pickle path. """ with open(filename, 'wb') as f: pickle.dump(self, f)
[docs] @classmethod def load_pickle(cls, filename: PathLike) -> "Cadence": """Load a cadence from a pickled file. Args: filename: Input pickle path created by `save_pickle()`. Returns: Deserialized cadence object. """ with open(filename, 'rb') as f: cad = pickle.load(f) return cad
[docs] class OrderedCadence(Cadence): """Cadence variant that tracks per-frame order labels."""
[docs] def __init__(self, frame_list: Iterable[_frame.Frame] | None = None, order: str = "ABACAD", t_slew: float = 0, t_overwrite: bool = False) -> None: """Initialize an ordered cadence. Args: frame_list: Optional iterable of frames to include. order: Cadence order string such as `"ABACAD"`. t_slew: Slew time between frames in seconds. t_overwrite: Whether to overwrite frame start times to enforce slew spacing. """ self.order = order Cadence.__init__(self, frame_list=frame_list, t_slew=t_slew, t_overwrite=t_overwrite)
def __setitem__(self, i: int, v: _frame.Frame) -> None: self._check(v) if i < 0: i = len(self) + i if ORDER_LABEL_METADATA_KEY not in v.metadata: v.add_metadata({ORDER_LABEL_METADATA_KEY: self.order[i]}) self.frames[i] = v
[docs] def insert(self, i: int, v: _frame.Frame) -> None: """Insert a frame and assign its order label when needed. Args: i: Insertion index. v: Frame to insert. """ self._check(v) if i < 0: i = len(self) + i if ORDER_LABEL_METADATA_KEY not in v.metadata: v.add_metadata({ORDER_LABEL_METADATA_KEY: self.order[i]}) self.frames.insert(i, v)
[docs] def set_order(self, order: str) -> None: """Reassign cadence order labels. Args: order: New cadence order string. """ self.order = order for i, fr in enumerate(self.frames): fr.add_metadata({ORDER_LABEL_METADATA_KEY: self.order[i]})
[docs] def by_label(self, order_label: str = "A") -> Cadence: """Filter frames by cadence order label. Args: order_label: Cadence label to match. Returns: New cadence containing only matching frames. """ return Cadence(frame_list=[frame for frame in self if frame.metadata[ORDER_LABEL_METADATA_KEY] == order_label])