Source code for setigen.voltage.waterfall

from __future__ import annotations

import numpy as np

from ._array_backend import xp
from ._reduction.channelize import _channelize_block
from ._reduction.decoder import _decode_raw_block
from ._reduction.input import _resolve_raw_input


[docs] def get_pfb_waterfall(pfb_voltages_x: np.ndarray, pfb_voltages_y: np.ndarray | None = None, fftlength: int = 256, int_factor: int = 1) -> np.ndarray: """Fine-channelize already-channelized complex voltages. Args: pfb_voltages_x: Complex voltages for the first polarization with shape ``(time_samples, num_chans)``. pfb_voltages_y: Optional complex voltages for the second polarization with the same shape as ``pfb_voltages_x``. fftlength: Fine-channel FFT length. int_factor: Time integration factor after fine channelization. Returns: Integrated power waterfall with shape ``(tchans, fchans)``. """ XX_psd = xp.zeros((pfb_voltages_x.shape[1], pfb_voltages_x.shape[0] // fftlength, fftlength)) pfb_voltages_list = [pfb_voltages_x] if pfb_voltages_y is not None: pfb_voltages_list.append(pfb_voltages_y) for pfb_voltages in pfb_voltages_list: X_samples = pfb_voltages.T X_samples = X_samples[:, :(X_samples.shape[1] // fftlength) * fftlength] X_samples = X_samples.reshape((X_samples.shape[0], X_samples.shape[1] // fftlength, fftlength)) XX = xp.fft.fft(X_samples, fftlength, axis=2) / fftlength**0.5 XX = xp.fft.fftshift(XX, axes=2) XX_psd += xp.abs(XX)**2 XX_psd = xp.concatenate(XX_psd, axis=1) # Integrate over time, trimming if necessary XX_psd = XX_psd[:(XX_psd.shape[0] // int_factor) * int_factor] XX_psd = XX_psd.reshape(XX_psd.shape[0] // int_factor, int_factor, XX_psd.shape[1]) XX_psd = XX_psd.sum(axis=1) return XX_psd
[docs] def get_waterfall_from_raw(raw_filename: str, block_size: int, num_chans: int, int_factor: int = 1, fftlength: int = 256) -> np.ndarray: """Reduce the first RAW block into a Stokes-I waterfall array. This helper is intentionally lightweight and primarily intended for tests and quick inspection of a single RAW block. Args: raw_filename: GUPPI RAW file path or RAW stem. block_size: Expected number of bytes in a RAW data block. num_chans: Expected number of coarse channels in the RAW file. int_factor: Time integration factor after fine channelization. fftlength: Fine-channel FFT length. Returns: Stokes-I waterfall for the first RAW block. Raises: ValueError: If the provided block metadata does not match the RAW header values. """ input_spec = _resolve_raw_input(raw_filename) if input_spec.block_size != block_size: raise ValueError(f"Provided block_size={block_size} does not match RAW header BLOCSIZE={input_spec.block_size}.") if input_spec.num_chans != num_chans: raise ValueError(f"Provided num_chans={num_chans} does not match RAW header OBSNCHAN={input_spec.num_chans}.") with open(input_spec.files[0], "rb") as handle: handle.read(input_spec.header_size) chunk = handle.read(block_size) voltages = _decode_raw_block(chunk, num_bits=input_spec.num_bits, num_pols=input_spec.num_pols, num_chans=input_spec.num_chans, start_chan=0, num_selected_chans=input_spec.num_chans) reduced = _channelize_block(voltages, fftlength=fftlength, integration_factor=int_factor, pol_mode=1, backend="auto") return reduced[:, 0, :]