Source code for setigen.voltage.reduction

from __future__ import annotations

from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
from typing import Iterator, Literal

import numpy as np
from astropy import units as u

from ..frame import Frame
from ._reduction import (
    _build_reduction_metadata,
    _channelize_block,
    _create_writer,
    _decode_raw_block,
    _iter_raw_data_blocks,
    _resolve_raw_input,
)
from ._reduction.writers import _build_filterbank_header


[docs] class PolarizationMode(IntEnum): """Supported polarization products for native RAW reduction.""" TOTAL_POWER = 1 FULL_POL = 4 FULL_STOKES = -4
[docs] @dataclass(frozen=True) class RawReductionSpec: """Configuration for reducing a GUPPI RAW input into a spectrogram product.""" fftlength: int integration_factor: int pol_mode: int output_format: Literal["fil", "h5"] start_chan: int | None = None num_chans: int | None = None frequency_range: tuple[float, float] | None = None backend: Literal["auto", "numpy", "cupy"] = "auto" accuracy: Literal["exact", "approx_zoom"] = "exact" coarse_method: Literal["auto", "full", "selected"] = "auto" fine_method: Literal["auto", "full", "selected"] = "auto" def __post_init__(self) -> None: """Validate reducer configuration. Raises: ValueError: If any reducer parameter is unsupported or invalid. """ if self.fftlength <= 0: raise ValueError("fftlength must be positive.") if self.integration_factor <= 0: raise ValueError("integration_factor must be positive.") if self.pol_mode not in tuple(mode.value for mode in PolarizationMode): raise ValueError(f"Unsupported polarization mode '{self.pol_mode}'.") if self.output_format not in ("fil", "h5"): raise ValueError(f"Unsupported output format '{self.output_format}'.") if self.backend not in ("auto", "numpy", "cupy"): raise ValueError(f"Unsupported reduction backend '{self.backend}'.") if self.accuracy not in ("exact", "approx_zoom"): raise ValueError(f"Unsupported accuracy mode '{self.accuracy}'.") if self.accuracy != "exact": raise ValueError("RAW reduction currently supports only accuracy='exact'.") if self.coarse_method not in ("auto", "full", "selected"): raise ValueError(f"Unsupported coarse method '{self.coarse_method}'.") if self.fine_method not in ("auto", "full", "selected"): raise ValueError(f"Unsupported fine method '{self.fine_method}'.") if self.start_chan is not None and self.start_chan < 0: raise ValueError("start_chan must be non-negative.") if self.num_chans is not None and self.num_chans <= 0: raise ValueError("num_chans must be positive.") if self.frequency_range is not None: if self.start_chan is not None or self.num_chans is not None: raise ValueError("frequency_range is mutually exclusive with start_chan/num_chans.") if len(self.frequency_range) != 2: raise ValueError("frequency_range must contain exactly two frequency bounds.")
def _reduce_chunks( input_path: str | Path, spec: RawReductionSpec, *, max_blocks: int | None = None, ) -> Iterator[tuple[np.ndarray, object, object]]: """Yield reduced spectrogram chunks from a RAW stem or `.raw` file. Args: input_path: RAW stem or specific `.raw` file path. spec: Reduction configuration. max_blocks: Optional maximum number of RAW blocks to process. Yields: Tuples of reduced spectrogram chunk, parsed RAW input description, and derived reduction metadata. """ input_spec = _resolve_raw_input(input_path) metadata = _build_reduction_metadata(input_spec, spec) for data_chunk in _iter_raw_data_blocks(input_spec, max_blocks=max_blocks): channel_indices = None if metadata.channel_start != 0 or metadata.channel_stop != metadata.num_chans * spec.fftlength: channel_indices = np.arange(metadata.channel_start, metadata.channel_stop) voltages = _decode_raw_block( data_chunk, num_bits=input_spec.num_bits, num_pols=input_spec.num_pols, num_chans=input_spec.num_chans, start_chan=metadata.start_chan, num_selected_chans=metadata.num_chans, ) reduced = _channelize_block( voltages, fftlength=spec.fftlength, integration_factor=spec.integration_factor, pol_mode=spec.pol_mode, backend=spec.backend, channel_indices=channel_indices, fine_method=spec.fine_method, ) if reduced is not None and reduced.shape[0] > 0: yield reduced, input_spec, metadata
[docs] def reduce_raw( input_path: str | Path, output_path: str | Path, spec: RawReductionSpec, *, overwrite: bool = False, tmp_dir: str | Path | None = None, ) -> Path: """Reduce a RAW stem or `.raw` file into a `.fil` or `.h5` product. Args: input_path: RAW stem or specific `.raw` file path. output_path: Destination `.fil` or `.h5` path. spec: Reduction configuration. overwrite: Whether an existing output file may be replaced. tmp_dir: Optional directory for staged writes. Returns: Final output path. Raises: ValueError: If no spectra are produced for the supplied input and reduction settings. """ output_path = Path(output_path) chunk_iter = _reduce_chunks(input_path, spec) first = next(chunk_iter, None) if first is None: raise ValueError( "No spectra were produced from the supplied RAW input and reduction settings." ) first_chunk, input_spec, metadata = first header = _build_filterbank_header(input_spec, metadata, pol_mode=spec.pol_mode) with _create_writer( output_path, output_format=spec.output_format, overwrite=overwrite, tmp_dir=tmp_dir, header=header, total_fchans=metadata.total_fchans, nifs=metadata.nifs, ) as writer: writer.append(first_chunk) for chunk, _, _ in chunk_iter: writer.append(chunk) return output_path
[docs] def reduce_raw_to_frame( input_path: str | Path, spec: RawReductionSpec, *, max_blocks: int | None = None, ) -> Frame: """Reduce a total-power RAW input directly into a `Frame` for inspection. Args: input_path: RAW stem or specific `.raw` file path. spec: Reduction configuration. Must request total power. max_blocks: Optional maximum number of RAW blocks to process. Returns: Reduced `Frame` object. Raises: ValueError: If the reduction is not total power or no frame rows are produced. """ if spec.pol_mode != PolarizationMode.TOTAL_POWER: raise ValueError( "reduce_raw_to_frame only supports total-power / Stokes-I reduction." ) chunks = [] input_spec = None metadata = None for chunk, input_spec, metadata in _reduce_chunks( input_path, spec, max_blocks=max_blocks, ): chunks.append(chunk[:, 0, :]) if not chunks or input_spec is None or metadata is None: raise ValueError( "No frame data were produced from the supplied RAW input and reduction settings." ) data = np.concatenate(chunks, axis=0) return Frame.from_data( df=metadata.df_hz * u.Hz, dt=metadata.dt_s * u.s, fch1=metadata.fch1_hz * u.Hz, ascending=metadata.ascending, data=data, metadata={ "raw_input_stem": str(input_spec.stem), "pol_mode": int(spec.pol_mode), "fftlength": spec.fftlength, "integration_factor": spec.integration_factor, }, )