Source code for setigen.split_utils

import sys
import os
import errno
from pathlib import Path
import numpy as np
from blimpy import Waterfall


[docs] def split_waterfall_generator(waterfall_fn, fchans, tchans=None, f_shift=None): """ Create a generator that returns smaller Waterfall objects by 'splitting' an input filterbank file according to the number of frequency samples. Since this function only loads in data in chunks according to fchans, it handles very large observations well. Specifically, it will not attempt to load all the data into memory before splitting, which won't work when the data is very large anyway. Parameters ---------- waterfall_fn : str Filterbank filename with .fil extension fchans : int Number of frequency samples per new filterbank file tchans : int, optional Number of time samples to select - will default from start of observation. If None, just uses the entire integration time f_shift : int, optional Number of samples to shift when splitting filterbank. If None, defaults to ``f_shift=fchans`` so that there is no overlap between new filterbank files Returns ------- waterfall : Waterfall A blimpy Waterfall object containing a smaller section of the data """ info_wf = Waterfall(waterfall_fn, load_data=False) fch1 = info_wf.header['fch1'] nchans = info_wf.header['nchans'] df = info_wf.header['foff'] tchans_tot = info_wf.container.selection_shape[0] if f_shift is None: f_shift = fchans if tchans is None: tchans = tchans_tot elif tchans > tchans_tot: raise ValueError('tchans value must be less than the total number of \ time samples in the observation') # Note that df is negative! f_start, f_stop = fch1, fch1 + fchans * df # Iterates down frequencies, starting from highest while np.abs(f_stop - fch1) <= np.abs(nchans * df): fmin, fmax = np.sort([f_start, f_stop]) waterfall = Waterfall(waterfall_fn, f_start=fmin, f_stop=fmax, t_start=0, t_stop=tchans) yield waterfall f_start += f_shift * df f_stop += f_shift * df
[docs] def split_fil(waterfall_fn, output_dir, fchans, tchans=None, f_shift=None): """ Create a set of new filterbank files by 'splitting' an input filterbank file according to the number of frequency samples. Parameters ---------- waterfall_fn : str Filterbank filename with .fil extension output_dir : str Directory for new filterbank files fchans : int Number of frequency samples per new filterbank file tchans : int, optional Number of time samples to select - will default from start of observation. If None, just uses the entire integration time f_shift : int, optional Number of samples to shift when splitting filterbank. If None, defaults to ``f_shift=fchans`` so that there is no overlap between new filterbank files Returns ------- split_fns : list of str List of new filenames """ output_dir = Path(output_dir) try: os.makedirs(output_dir) except OSError as e: if e.errno != errno.EEXIST: raise split_generator = split_waterfall_generator(waterfall_fn, fchans, tchans=tchans, f_shift=f_shift) # Iterates down frequencies, starting from highest split_fns = [] for i, waterfall in enumerate(split_generator): output_fn = output_dir / f"{fchans}_{i:04d}.fil" waterfall.write_to_fil(output_fn) split_fns.append(output_fn) print(f"Saved {output_fn}") return split_fns
[docs] def split_array(data, f_sample_num=None, t_sample_num=None, f_shift=None, t_shift=None, f_trim=False, t_trim=False): """ Split NumPy arrays into a list of smaller arrays according to limits in frequency and time. This doesn't reduce/combine data, it simply cuts the data into smaller chunks. Parameters ---------- data : ndarray Time-frequency data Returns ------- split_data : list of ndarray List of new time-frequency data frames """ split_data = [] if not isinstance(data, np.ndarray): raise ValueError("Input data must be a numpy array") height, width = data.shape if f_sample_num is None: f_sample_num = width if t_sample_num is None: t_sample_num = height if f_shift is None: f_shift = f_sample_num elif f_shift <= 0: raise ValueError(f"Invalid x-direction shift: {f_shift}") if t_shift is None: t_shift = t_sample_num elif t_shift <= 0: raise ValueError(f"Invalid y-direction shift: {t_shift}") # Save first frame, regardless of overstepping bounds y_start = 0 y_stop = min(t_sample_num, height) x_start = 0 x_stop = min(f_sample_num, width) split_data.append(data[y_start:y_stop, x_start:x_stop]) y_in_bound = (y_stop < height) x_in_bound = (x_stop < width) # As long as either bound is valid, continue adding frames while y_in_bound or x_in_bound: # Shift frames in the x direction while x_in_bound: x_start = x_start + f_shift x_stop = min(x_stop + f_shift, width) split_data.append(data[y_start:y_stop, x_start:x_stop]) x_in_bound = (x_stop < width) # Break when both y and x are out of bounds if not y_in_bound: break # Shift frames in the y direction and reset x indices y_start = y_start + t_shift y_stop = min(y_stop + t_shift, height) x_start = 0 x_stop = min(f_sample_num, width) split_data.append(data[y_start:y_stop, x_start:x_stop]) y_in_bound = (y_stop < height) x_in_bound = (x_stop < width) # Filter out frames that aren't the same specied size if t_trim: split_data = list(filter(lambda A: A.shape[0] == t_sample_num, split_data)) if f_trim: split_data = list(filter(lambda A: A.shape[1] == f_sample_num, split_data)) return np.array(split_data)