Source code for sunraster.spectrogram_sequence

import numbers
import textwrap

import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.time import Time
from ndcube import NDCubeSequence

from sunraster.spectrogram import (
    SUPPORTED_LATITUDE_NAMES,
    SUPPORTED_LONGITUDE_NAMES,
    SUPPORTED_SPECTRAL_NAMES,
    SUPPORTED_TIME_NAMES,
    SpectrogramABC,
    _find_axis_name,
)

__all__ = ["SpectrogramSequence", "RasterSequence"]

RASTER_AXIS_NAME = "raster scan"
SNS_AXIS_NAME = "temporal"
SLIT_STEP_AXIS_NAME = "slit step"
SLIT_AXIS_NAME = "position along slit"
SPECTRAL_AXIS_NAME = "spectral"


[docs] class SpectrogramSequence(NDCubeSequence, SpectrogramABC): """ Class for holding, slicing and plotting a sequence of spectrogram cubes. Spectrogram cubes can be 2D or higher. Parameters ---------- data_list: `list` List of `SpectrogramCube` objects from the same spectral window and OBS ID. Must also contain the 'detector type' in its meta attribute. common_axis: `int` or `None` (optional) If the sequence axis is aligned with an axis of the component SpectrogramCube instances, e.g. Spectrogram cubes have a time dimension and are arranged within the sequence in chronological order, set this input to the axis number of the time axis within the cubes. Default=None implies there is no common axis. meta: `dict` or header object (optional) Metadata associated with the sequence. """ def __init__(self, data_list, common_axis=None, meta=None): # Initialize Sequence. super().__init__(data_list, common_axis=common_axis, meta=meta) @property def spectral_axis(self): return u.Quantity([raster.spectral_axis for raster in self.data]) @property def time(self): return Time(np.concatenate([raster.time for raster in self.data])) @property def exposure_time(self): exposure_type = type(self.data[0].exposure_time) exposure_time = np.concatenate([raster.exposure_time for raster in self.data]) try: return exposure_type(exposure_time) except Exception: return exposure_time @property def celestial(self): sc = SkyCoord([raster.celestial for raster in self.data]) sc_shape = list(sc.shape) sc_shape.insert(0, len(self.data)) sc_shape[1] = int(sc_shape[1] / sc_shape[0]) return sc.reshape(sc_shape)
[docs] def apply_exposure_time_correction(self, undo=False, copy=False, force=False): """ Applies or undoes exposure time correction to data and uncertainty and adjusts unit. Correction is only applied (undone) if the object's unit doesn't (does) already include inverse time. This can be overridden so that correction is applied (undone) regardless of unit by setting force=True. Parameters ---------- undo: `bool` If False, exposure time correction is applied. If True, exposure time correction is removed. Default=False copy: `bool` If True a new instance with the converted data values is returned. If False, the current instance is overwritten. Default=False force: `bool` If not True, applies (undoes) exposure time correction only if unit doesn't (does) already include inverse time. If True, correction is applied (undone) regardless of unit. Unit is still adjusted accordingly. Returns ------- `None` or `SpectrogramSequence` If copy=False, the original SpectrogramSequence is modified with the exposure time correction applied (undone). If copy=True, a new SpectrogramSequence is returned with the correction applied (undone). """ converted_data_list = [cube.apply_exposure_time_correction(undo=undo, force=force) for cube in self.data] if copy is True: return self.__class__(converted_data_list, meta=self.meta, common_axis=self._common_axis) else: self.data = converted_data_list
def __str__(self): data0 = self.data[0] if not (data0._time_name and data0._longitude_name and data0._latitude_name and data0._spectral_name): for i, cube in enumerate(self): self.data[i]._time_name, self.data[i]._time_loc = _find_axis_name( SUPPORTED_TIME_NAMES, cube.wcs.world_axis_physical_types, cube.extra_coords, cube.meta, ) ( self.data[i]._longitude_name, self.data[i]._longitude_loc, ) = _find_axis_name( SUPPORTED_LONGITUDE_NAMES, cube.wcs.world_axis_physical_types, cube.extra_coords, cube.meta, ) ( self.data[i]._latitude_name, self.data[i]._latitude_loc, ) = _find_axis_name( SUPPORTED_LATITUDE_NAMES, cube.wcs.world_axis_physical_types, cube.extra_coords, cube.meta, ) ( self.data[i]._spectral_name, self.data[i]._spectral_loc, ) = _find_axis_name( SUPPORTED_SPECTRAL_NAMES, cube.wcs.world_axis_physical_types, cube.extra_coords, cube.meta, ) data0 = self.data[0] if data0._time_name: start_time = data0.time if data0.time.isscalar else data0.time.squeeze()[0] data_1 = self.data[-1] stop_time = data_1.time if data_1.time.isscalar else data_1.time.squeeze()[-1] time_period = start_time if start_time == stop_time else Time([start_time.iso, stop_time.iso]) else: time_period = None if data0._longitude_name or data0._latitude_name: sc = self.celestial component_names = dict([(item, key) for key, item in sc.representation_component_names.items()]) lon = getattr(sc, component_names["lon"]) lat = getattr(sc, component_names["lat"]) if sc.isscalar: lon_range = lon lat_range = lat else: lon_range = u.Quantity([lon.min(), lon.max()]) lat_range = u.Quantity([lat.min(), lat.max()]) if lon_range[0] == lon_range[1]: lon_range = lon_range[0] if lat_range[0] == lat_range[1]: lat_range = lat_range[0] else: lon_range = None lat_range = None if data0._spectral_name: spectral_vals = self.spectral_axis spectral_min = spectral_vals.min() spectral_max = spectral_vals.max() spectral_range = spectral_min if spectral_min == spectral_max else u.Quantity([spectral_min, spectral_max]) else: spectral_range = None return textwrap.dedent( f"""\ {self.__class__.__name__} {"".join(["-"] * len(self.__class__.__name__))} Time Range: {time_period} Pixel Dimensions: {self.dimensions} Longitude range: {lon_range} Latitude range: {lat_range} Spectral range: {spectral_range} Data unit: {self.data[0].unit}""" ) def __repr__(self): return f"{object.__repr__(self)}\n{str(self)}"
[docs] class RasterSequence(SpectrogramSequence): """ Class for holding, slicing and plotting series of spectrograph raster scans. Parameters ---------- data_list: `list` List of `SpectrogramCube` objects from the same spectral window and OBS ID. Must also contain the 'detector type' in its meta attribute. common_axis: `int` The axis of the SpectrogramCube instances corresponding to the slit step axis. meta: `dict` or header object (optional) Metadata associated with the sequence. """ def __init__(self, data_list, common_axis, meta=None): # Initialize Sequence. super().__init__(data_list, common_axis=common_axis, meta=meta) # Determine axis indices of instrument axis types. self._raster_axis_name = RASTER_AXIS_NAME self._sns_axis_name = SNS_AXIS_NAME self._slit_step_axis_name = SLIT_STEP_AXIS_NAME self._slit_axis_name = SLIT_AXIS_NAME self._spectral_axis_name = SPECTRAL_AXIS_NAME self._set_single_scan_instrument_axes_types() raster_dimensions = SpectrogramSequence.dimensions sns_dimensions = SpectrogramSequence.cube_like_dimensions raster_array_axis_physical_types = SpectrogramSequence.array_axis_physical_types sns_array_axis_physical_types = SpectrogramSequence.cube_like_array_axis_physical_types raster_axis_coords = SpectrogramSequence.sequence_axis_coords sns_axis_coords = SpectrogramSequence.common_axis_coords plot_as_raster = SpectrogramSequence.plot plot_as_sns = SpectrogramSequence.plot_as_cube def _set_single_scan_instrument_axes_types(self): if len(self.data) < 1: self._single_scan_instrument_axes_types = np.empty((0,), dtype=object) else: self._single_scan_instrument_axes_types = np.empty(self.data[0].data.ndim, dtype=object) # Slit step axis name. if self._common_axis is not None: self._single_scan_instrument_axes_types[self._common_axis] = self._slit_step_axis_name # Spectral axis name. # If spectral name not present in raster cube, try finding it. if not self.data[0]._spectral_name: for i, cube in enumerate(self): ( self.data[i]._spectral_name, self.data[i]._spectral_name, ) = _find_axis_name( SUPPORTED_SPECTRAL_NAMES, cube.wcs.world_axis_physical_types, cube.extra_coords, cube.meta, ) spectral_name = self.data[0]._spectral_name array_axis_physical_types = self.data[0].array_axis_physical_types spectral_raster_index = [physical_type == (spectral_name,) for physical_type in array_axis_physical_types] spectral_raster_index = np.arange(self.data[0].data.ndim)[spectral_raster_index] if len(spectral_raster_index) == 1: self._single_scan_instrument_axes_types[spectral_raster_index] = self._spectral_axis_name # Slit axis name. w = self._single_scan_instrument_axes_types == None if w.sum() > 1: raise ValueError( "Unable to parse the WCS or common_axis to work out either or both the slit-step axis nor the spectral (aka the slit) axis." ) self._single_scan_instrument_axes_types[w] = self._slit_axis_name # Remove any instrument axes types whose axes are missing. self._single_scan_instrument_axes_types.astype(str) @property def slice_as_sns(self): """ Method to slice instance as though data were taken as a sit-and-stare, i.e. slit position and raster number are combined into a single axis. """ return _snsSlicer(self) @property def slice_as_raster(self): """ Method to slice instance as though data were 4D, i.e. raster number, slit step position, position along slit, wavelength. """ return _SequenceSlicer(self) def __getitem__(self, item): result = super().__getitem__(item) if isinstance(result, self.__class__): # If slit step axis sliced out, return SpectrogramSequence # as the spectrogram cubes no longer represent a raster. if len(item) > self._common_axis and isinstance(item[1:][self._common_axis], numbers.Integral): result = SpectrogramSequence(result.data, common_axis=None, meta=result.meta) else: # Else, slice the instrument axis types accordingly. result._set_single_scan_instrument_axes_types() # result._single_scan_instrument_axes_types = _slice_scan_axis_types( # self._single_scan_instrument_axes_types, item[1:]) return result @property def raster_instrument_axes_types(self): return tuple([self._raster_axis_name] + list(self._single_scan_instrument_axes_types)) @property def sns_instrument_axes_types(self): return tuple( [self._sns_axis_name] + list( self._single_scan_instrument_axes_types[ self._single_scan_instrument_axes_types != self._slit_step_axis_name ] ) )
class _snsSlicer: """ Helper class to make slicing in index_as_cube sliceable/indexable like a numpy array. Parameters ---------- seq : `ndcube.NDCubeSequence` Object of NDCubeSequence. """ def __init__(self, seq): self.seq = seq def __getitem__(self, item): result = self.seq.index_as_cube[item] if isinstance(item, tuple) and not isinstance(item[0], numbers.Integral): result._set_single_scan_instrument_axes_types() # result._single_scan_instrument_axes_types = _slice_scan_axis_types( # self.seq._single_scan_instrument_axes_types, item) return result class _SequenceSlicer: def __init__(self, seq): self.seq = seq def __getitem__(self, item): return self.seq[item] def _slice_scan_axis_types(single_scan_axes_types, item): """ Updates RasterSequence._single_scan_axes_types according to slicing. Parameters ---------- single_scan_axes_types: `numpy.ndarray` Value of RasterSequence._single_scan_axes_types, i.e. array of strings giving type of each axis. item: `int`, `slice` or `tuple` of `slice`s. The slicing item that get applied to the Raster instances within the RasterSequences. Returns ------- new_single_scan_axes_types: `numpy.ndarray` Update value of axis types with which to replace RasterSequence._single_scan_axes_types. """ # Get boolean axes indices of axis items that aren't int, # i.e. axes that are not sliced away. not_int_axis_items = [not isinstance(axis_item, numbers.Integral) for axis_item in item] # Add boolean indices for axes not included in item. not_int_axis_items += [True] * (len(single_scan_axes_types) - len(not_int_axis_items)) return single_scan_axes_types[np.array(not_int_axis_items)]