Source code for sunraster.spectrogram

import abc
import numbers
import textwrap
from copy import deepcopy

import astropy.units as u
import ndcube.utils.wcs as nuw
import numpy as np
from astropy.time import Time
from ndcube.ndcube import NDCube

from sunraster.meta import Meta

__all__ = ["SpectrogramCube"]


# Define some custom error messages.
APPLY_EXPOSURE_TIME_ERROR = (
    "Exposure time correction has probably already "
    "been applied since the unit already includes "
    "inverse time. To apply exposure time correction "
    "anyway, set 'force' kwarg to True."
)
UNDO_EXPOSURE_TIME_ERROR = (
    "Exposure time correction has probably already "
    "been undone since the unit does not include "
    "inverse time. To undo exposure time correction "
    "anyway, set 'force' kwarg to True."
)
AXIS_NOT_FOUND_ERROR = "axis not found. If in extra_coords, axis name must be supported:"

# Define supported coordinate names for coordinate properties.
SUPPORTED_LONGITUDE_NAMES = [
    "custom:pos.helioprojective.lon",
    "pos.helioprojective.lon",
    "longitude",
    "lon",
]
SUPPORTED_LONGITUDE_NAMES += [name.upper() for name in SUPPORTED_LONGITUDE_NAMES] + [
    name.capitalize() for name in SUPPORTED_LONGITUDE_NAMES
]
SUPPORTED_LONGITUDE_NAMES = np.array(SUPPORTED_LONGITUDE_NAMES)

SUPPORTED_LATITUDE_NAMES = [
    "custom:pos.helioprojective.lat",
    "pos.helioprojective.lat",
    "latitude",
    "lat",
]
SUPPORTED_LATITUDE_NAMES += [name.upper() for name in SUPPORTED_LATITUDE_NAMES] + [
    name.capitalize() for name in SUPPORTED_LATITUDE_NAMES
]
SUPPORTED_LATITUDE_NAMES = np.array(SUPPORTED_LATITUDE_NAMES)

SUPPORTED_SPECTRAL_NAMES = [
    "em.wl",
    "em.energy",
    "em.freq",
    "wavelength",
    "energy",
    "frequency",
    "freq",
    "lambda",
    "spectral",
]
SUPPORTED_SPECTRAL_NAMES += [name.upper() for name in SUPPORTED_SPECTRAL_NAMES] + [
    name.capitalize() for name in SUPPORTED_SPECTRAL_NAMES
]
SUPPORTED_SPECTRAL_NAMES = np.array(SUPPORTED_SPECTRAL_NAMES)

SUPPORTED_TIME_NAMES = ["time"]
SUPPORTED_TIME_NAMES += [name.upper() for name in SUPPORTED_TIME_NAMES] + [
    name.capitalize() for name in SUPPORTED_TIME_NAMES
]
SUPPORTED_TIME_NAMES = np.array(SUPPORTED_TIME_NAMES)

SUPPORTED_EXPOSURE_NAMES = [
    "exposure time",
    "exposure_time",
    "exposure times",
    "exposure_times",
    "exp time",
    "exp_time",
    "exp times",
    "exp_times",
]
SUPPORTED_EXPOSURE_NAMES += [name.upper() for name in SUPPORTED_EXPOSURE_NAMES] + [
    name.capitalize() for name in SUPPORTED_EXPOSURE_NAMES
]
SUPPORTED_EXPOSURE_NAMES = np.array(SUPPORTED_EXPOSURE_NAMES)


class SpectrogramABC(abc.ABC):
    # Abstract Base Class to define the basic API of Spectrogram classes.

    @abc.abstractproperty
    def spectral_axis(self):
        """
        Return the spectral coordinates for each pixel.
        """

    @abc.abstractproperty
    def time(self):
        """
        Return the time coordinates for each pixel.
        """

    @abc.abstractproperty
    def exposure_time(self):
        """
        Return the exposure time for each exposure.
        """

    @abc.abstractproperty
    def celestial(self):
        """
        Return the celestial coordinates for each pixel.
        """

    @abc.abstractmethod
    def apply_exposure_time_correction(self, undo=False, force=False):
        """
        Applies or undoes exposure time correction to data and uncertainty and
        adjusts unit.

        Correction is only applied (undone) if the object's unit doesn't (does)
        already include inverse time. This can be overridden so that correction
        is applied (undone) regardless of unit by setting force=True.

        Parameters
        ----------
        undo: `bool`
            If False, exposure time correction is applied.
            If True, exposure time correction is undone.
            Default=False
        force: `bool`
            If not True, applies (undoes) exposure time correction only if unit
            doesn't (does) already include inverse time.
            If True, correction is applied (undone) regardless of unit. Unit is still
            adjusted accordingly.

        Returns
        -------
        result: `SpectrogramCube`
            New SpectrogramCube in new units.
        """


[docs] class SpectrogramCube(NDCube, SpectrogramABC): """ Class representing a sit-and-stare or single raster of slit spectrogram data. Must be described by a single WCS. Parameters ---------- data: `numpy.ndarray` The array holding the actual data in this object. wcs: `ndcube.wcs.wcs.WCS` The WCS object containing the axes' information unit : `astropy.unit.Unit` or `str`, optional Unit for the dataset. Strings that can be converted to a Unit are allowed. uncertainty : any type, optional Uncertainty in the dataset. Should have an attribute uncertainty_type that defines what kind of uncertainty is stored, for example "std" for standard deviation or "var" for variance. A metaclass defining such an interface is NDUncertainty - but isn't mandatory. If the uncertainty has no such attribute the uncertainty is stored as UnknownUncertainty. Defaults to None. meta : dict-like object, optional Additional meta information about the dataset. mask : any type, optional Mask for the dataset. Masks should follow the numpy convention that valid data points are marked by False and invalid ones with True. Defaults to None. instrument_axes : list, optional This is the relationship between the array axes and the instrument, i.e. repeat raster axis, slit position, position along slit, and spectral. These are needed because they cannot be inferred simply from the physical types. copy : `bool`, optional Indicates whether to save the arguments as copy. True copies every attribute before saving it while False tries to save every parameter as reference. Note however that it is not always possible to save the input as reference. Default is False. """ def __init__( self, data, wcs, unit=None, uncertainty=None, meta=None, mask=None, instrument_axes=None, copy=False, **kwargs, ): # Initialize SpectrogramCube. super().__init__( data, wcs=wcs, uncertainty=uncertainty, mask=mask, meta=meta, unit=unit, copy=copy, **kwargs, ) # Determine labels and location of each key real world coordinate. self_extra_coords = self.extra_coords world_axis_physical_types = np.array(self.wcs.world_axis_physical_types) self._longitude_name, self._longitude_loc = _find_axis_name( SUPPORTED_LONGITUDE_NAMES, world_axis_physical_types, self_extra_coords, self.meta, ) self._latitude_name, self._latitude_loc = _find_axis_name( SUPPORTED_LATITUDE_NAMES, world_axis_physical_types, self_extra_coords, self.meta, ) self._spectral_name, self._spectral_loc = _find_axis_name( SUPPORTED_SPECTRAL_NAMES, world_axis_physical_types, self_extra_coords, self.meta, ) self._time_name, self._time_loc = _find_axis_name( SUPPORTED_TIME_NAMES, world_axis_physical_types, self_extra_coords, self.meta, ) self._exposure_time_name, self._exposure_time_loc = _find_axis_name( SUPPORTED_EXPOSURE_NAMES, world_axis_physical_types, self_extra_coords, self.meta, ) # Set up instrument axes if set. if instrument_axes is None: self.instrument_axes = instrument_axes elif len(instrument_axes) != data.ndim: raise ValueError("Length of instrument_axes must match number of data axes.") else: self.instrument_axes = np.asarray(instrument_axes, dtype=str) # TODO: Remove after ndcube 2.1 if not isinstance(self.meta, Meta): self.meta = Meta(self.meta, data_shape=self.data.shape) def __str__(self): try: if self.time.isscalar: time_period = self.time else: times = self.time time_period = Time([times.min(), times.max()]).iso except ValueError as err: if AXIS_NOT_FOUND_ERROR in err.args[0]: time_period = None else: raise err try: sc = self.celestial component_names = dict([(item, key) for key, item in sc.representation_component_names.items()]) lon = getattr(sc, component_names["lon"]) lat = getattr(sc, component_names["lat"]) if sc.isscalar: lon_range = lon lat_range = lat elif sc.size == 0: lon_range = None lat_range = None else: lon_range = u.Quantity([lon.min(), lon.max()]) lat_range = u.Quantity([lat.min(), lat.max()]) except ValueError as err: if AXIS_NOT_FOUND_ERROR not in err.args[0]: raise err lon_range = None lat_range = None try: if self.spectral_axis.isscalar: spectral_range = self.spectral_axis else: spectral_range = u.Quantity([self.spectral_axis.min(), self.spectral_axis.max()]) except ValueError as err: if AXIS_NOT_FOUND_ERROR in err.args[0]: spectral_range = None else: raise err return textwrap.dedent( f"""\ {self.__class__.__name__} {"".join(["-"] * len(self.__class__.__name__))} Time Period: {time_period} Instrument axes: {self.instrument_axes} Pixel dimensions: {self.dimensions.astype(int)} Longitude range: {lon_range} Latitude range: {lat_range} Spectral range: {spectral_range} Data unit: {self.unit}""" ) def __repr__(self): return f"{object.__repr__(self)}\n{str(self)}" def __getitem__(self, item): # Slice SpectrogramCube using parent slicing. result = super().__getitem__(item) # Slice instrument_axes if it exists. # If item is a slice, cube dimensionality is not reduced # so instrument_axes need not be sliced. if self.instrument_axes is None or isinstance(item, slice): instrument_axes = self.instrument_axes else: # If item is int, instrument_axes needs slicing. if isinstance(item, numbers.Integral): instrument_axes = self.instrument_axes[1:] # If item is tuple, instrument axes will need to be sliced if tuple contains an int. elif isinstance(item, tuple): instr_item = [isinstance(i, numbers.Integral) for i in item] + [False] * ( len(self.instrument_axes) - len(item) ) instrument_axes = self.instrument_axes[np.invert(instr_item)] else: raise TypeError("Unrecognized slice item. Must be int, slice or tuple.") # If slicing causes cube to be a scalar, set instrument_axes to None. if len(instrument_axes) == 0: instrument_axes = None result.instrument_axes = instrument_axes # TODO: Remove for ndcube 2.1 # Slice metadata if possible. try: result.meta = self.meta[item] except TypeError as err: if "unhashable type" not in err.args[0]: raise err except KeyError: pass return result @property def spectral_axis(self): if not self._spectral_name: self._spectral_name, self._spectral_loc = _find_axis_name( SUPPORTED_SPECTRAL_NAMES, self.wcs.world_axis_physical_types, self.extra_coords, self.meta, ) if not self._spectral_name: raise ValueError(f"Spectral{AXIS_NOT_FOUND_ERROR}" + f"{SUPPORTED_SPECTRAL_NAMES}") return self._get_axis_coord(self._spectral_name, self._spectral_loc) @property def time(self): if not self._time_name: self._time_name, self._time_loc = _find_axis_name( SUPPORTED_TIME_NAMES, self.wcs.world_axis_physical_types, self.extra_coords, self.meta, ) if not self._time_name: raise ValueError(f"Time {AXIS_NOT_FOUND_ERROR} {SUPPORTED_TIME_NAMES}") return Time(self._get_axis_coord(self._time_name, self._time_loc)) @property def exposure_time(self): if not self._exposure_time_name or not hasattr(self, "_exposure_time_loc"): self._exposure_time_name, self._exposure_time_loc = _find_axis_name( SUPPORTED_EXPOSURE_NAMES, self.wcs.world_axis_physical_types, self.extra_coords, self.meta, ) if not self._exposure_time_name: raise ValueError(f"Exposure time {AXIS_NOT_FOUND_ERROR} {SUPPORTED_EXPOSURE_NAMES}") return self._get_axis_coord(self._exposure_time_name, self._exposure_time_loc) @property def celestial(self): if not self._longitude_name: self._longitude_name, self._longitude_loc = _find_axis_name( SUPPORTED_LONGITUDE_NAMES, self.wcs.world_axis_physical_types, self.extra_coords, self.meta, ) if not self._latitude_name: self._latitude_name, self._latitude_loc = _find_axis_name( SUPPORTED_LATITUDE_NAMES, self.wcs.world_axis_physical_types, self.extra_coords, self.meta, ) if self._longitude_name: celestial_name = self._longitude_name celestial_loc = self._longitude_loc elif self._latitude_name: celestial_name = self._latitude_name celestial_loc = self._latitude_loc else: raise ValueError( f"Celestial {AXIS_NOT_FOUND_ERROR} " f"{np.concatenate([SUPPORTED_LONGITUDE_NAMES, SUPPORTED_LATITUDE_NAMES])}" ) return self._get_axis_coord(celestial_name, celestial_loc)
[docs] def apply_exposure_time_correction(self, undo=False, force=False): # Get exposure time in seconds. exposure_time_s = self.exposure_time.to(u.s).value # If exposure time is not scalar, change array's shape so that # it can be broadcast with data and uncertainty arrays. if not np.isscalar(exposure_time_s): (exposure_axis,) = self._get_axis_coord_index(self._exposure_time_name, self._exposure_time_loc) # Change array shape for broadcasting item = [np.newaxis] * self.data.ndim item[exposure_axis] = slice(None) exposure_time_s = exposure_time_s[tuple(item)] # Based on value on undo kwarg, apply or remove exposure time correction. if undo is True: new_data, new_uncertainty, new_unit = _uncalculate_exposure_time_correction( self.data, self.uncertainty, self.unit, exposure_time_s, force=force ) else: new_data, new_uncertainty, new_unit = _calculate_exposure_time_correction( self.data, self.uncertainty, self.unit, exposure_time_s, force=force ) # Return new instance of SpectrogramCube with correction applied/undone. new_cube = deepcopy(self) new_cube._data = new_data new_cube._uncertainty = new_uncertainty new_cube._extra_coords = self.extra_coords new_cube._unit = new_unit return new_cube
def _get_axis_coord(self, axis_name, coord_loc): if coord_loc == "wcs": return self.axis_world_coords(axis_name)[0] elif coord_loc == "extra_coords": return self.axis_world_coords(wcs=self.extra_coords[axis_name])[0] elif coord_loc == "global_coords": return self.global_coords[axis_name] elif coord_loc == "meta": return self.meta[axis_name] else: raise ValueError(f"{coord_loc} is not a valid coordinate location.") def _get_axis_coord_index(self, axis_name, coord_loc): if coord_loc == "wcs": coord_pix_axes = nuw.physical_type_to_pixel_axes(axis_name, self.wcs) coord_array_axes = nuw.convert_between_array_and_pixel_axes(coord_pix_axes, len(self.dimensions)) return coord_array_axes.tolist()[0] elif coord_loc == "extra_coords": return self.extra_coords[axis_name].mapping[0] elif coord_loc == "meta": return self.meta.axes[axis_name] else: raise ValueError(f"{coord_loc} is not a valid coordinate location.") def _get_axis_coord_values(self, axis_name, coord_loc): if coord_loc == "wcs": return self.axis_world_coords_values(axis_name)[0] elif coord_loc == "extra_coords": return self.axis_world_coords_values(wcs=self.extra_coords[axis_name])[0] elif coord_loc == "global_coords": return self.global_coords[axis_name] else: raise ValueError(f"{coord_loc} is not a valid coordinate location.")
def _find_axis_name(supported_names, world_axis_physical_types, extra_coords, meta): """ Finds name of a SpectrogramCube axis type from WCS and extra coords. Parameters ---------- supported_names: 1D `numpy.ndarray` The names for the axis supported by `SpectrogramCube`. world_axis_physical_types: 1D `numpy.ndarray` Output of SpectrogramCube.world_axis_physical_types converted to an array. extra_coords: `ndcube.ExtraCoords` or `None` Output of SpectrogramCube.extra_coords meta: Meta or `None` Output of SpectrogramCube.meta Returns ------- axis_name: `str` The coordinate name of the axis. loc: `str` The location where the coordinate is stored: "wcs" or "extra_coords". """ axis_name = None loc = None # Check WCS for axis name. axis_name = _find_name_in_array(supported_names, world_axis_physical_types) if axis_name: loc = "wcs" elif extra_coords: # If axis name not in WCS, check extra_coords. axis_name = _find_name_in_array(supported_names, np.array(list(extra_coords.keys()))) if axis_name: loc = "extra_coords" if axis_name is None and meta: # If axis name not in WCS, check meta. axis_name = _find_name_in_array(supported_names, np.array(list(meta.keys()))) if axis_name: loc = "meta" return axis_name, loc def _find_name_in_array(supported_names, names_array): name_index = np.isin(names_array, supported_names) if name_index.sum() > 0: name_index = np.arange(len(names_array))[name_index][0] return names_array[name_index] return None def _calculate_exposure_time_correction(data, uncertainty, unit, exposure_time, force=False): """ Applies exposure time correction to data and uncertainty arrays. Parameters ---------- data: `numpy.ndarray` Data array to be converted. uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in data. old_unit: `astropy.unit.Unit` Unit of data arrays. exposure_time: `numpy.ndarray` Exposure time in seconds for each exposure in data arrays. Returns ------- new_data: `numpy.ndarray` Data array with exposure time corrected for. new_uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in new_data. new_unit: `astropy.unit.Unit` Unit of new_data array after exposure time correction. """ if force is not True and u.s in unit.decompose().bases: raise ValueError(APPLY_EXPOSURE_TIME_ERROR) # Else, either unit does not include inverse time and so # exposure does need to be applied, or # user has set force=True and wants the correction applied # regardless of the unit. new_data = data / exposure_time if uncertainty: uncertainty_unit = uncertainty.unit / u.s if uncertainty.unit else uncertainty.unit new_uncertainty = uncertainty.__class__(uncertainty.array / exposure_time, unit=uncertainty_unit) else: new_uncertainty = uncertainty new_unit = unit / u.s return new_data, new_uncertainty, new_unit def _uncalculate_exposure_time_correction(data, uncertainty, unit, exposure_time, force=False): """ Removes exposure time correction from data and uncertainty arrays. Parameters ---------- data: `numpy.ndarray` Data array to be converted. uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in data. old_unit: `astropy.unit.Unit` Unit of data arrays. exposure_time: `numpy.ndarray` Exposure time in seconds for each exposure in data arrays. Returns ------- new_data: `numpy.ndarray` Data array with exposure time corrected for. new_uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in new_data. new_unit: `astropy.unit.Unit` Unit of new_data array after exposure time correction. """ if force is not True and u.s in (unit * u.s).decompose().bases: raise ValueError(UNDO_EXPOSURE_TIME_ERROR) # Else, either unit does include inverse time and so # exposure does need to be removed, or # user has set force=True and wants the correction removed # regardless of the unit. new_data = data * exposure_time if uncertainty: uncertainty_unit = uncertainty.unit * u.s if uncertainty.unit else uncertainty.unit new_uncertainty = uncertainty.__class__(uncertainty.array * exposure_time, unit=uncertainty_unit) else: new_uncertainty = uncertainty new_unit = unit * u.s return new_data, new_uncertainty, new_unit