Source code for ndcube.extra_coords.extra_coords

import abc
from typing import Any, Tuple, Union, Iterable
from numbers import Integral
from functools import reduce, partial

import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.wcs import WCS
from astropy.wcs.wcsapi import BaseHighLevelWCS
from astropy.wcs.wcsapi.high_level_wcs_wrapper import HighLevelWCSWrapper
from astropy.wcs.wcsapi.wrappers.sliced_wcs import SlicedLowLevelWCS, sanitize_slices

from ndcube.utils.wcs import convert_between_array_and_pixel_axes
from ndcube.wcs.wrappers import CompoundLowLevelWCS, ResampledLowLevelWCS

from .table_coord import (BaseTableCoordinate, MultipleTableCoordinate, QuantityTableCoordinate,
                          SkyCoordTableCoordinate, TimeTableCoordinate)

__all__ = ['ExtraCoordsABC', 'ExtraCoords']


[docs] class ExtraCoordsABC(abc.ABC): """ A representation of additional world coordinates associated with pixel axes. ExtraCoords can be initialised by either specifying a `~astropy.wcs.wcsapi.BaseLowLevelWCS` object and a ``mapping``, or it can be built up by specifying one or more lookup tables. Parameters ---------- wcs The WCS specifying the extra coordinates. mapping The mapping between the array dimensions and pixel dimensions in the extra coords object. This is an iterable of ``(array_dimension, pixel_dimension)`` pairs of length equal to the number of pixel dimensions in the extra coords. """
[docs] @abc.abstractmethod def add(self, name: Union[str, Iterable[str]], array_dimension: Union[int, Iterable[int]], lookup_table: Any, physical_types: Union[str, Iterable[str]] = None, **kwargs): """ Add a coordinate to this `~ndcube.ExtraCoords` based on a lookup table. Parameters ---------- name : `str` or sequence of `str` The name(s) for these world coordinate(s). array_dimension : `int` or `tuple` of `int` The array dimension(s), to which this lookup table corresponds. lookup_table : `object` or sequence of `object` The lookup table. A `~ndcube.extra_coords.BaseTableCoordinate` subclass or anything that can instantiate one, i.e. currently a `~astropy.time.Time`, `~astropy.coordinates.SkyCoord`, or a (sequence of) `~astropy.units.Quantity`. physical_types: `str` or iterable of `str`, optional Descriptor(s) of the :ref:`<dimensions and physical types <dimensions>` associated with each axis; length must match the number of dimensions in ``lookup_table``. """
[docs] @abc.abstractmethod def keys(self) -> Iterable[str]: """ The world axis names for all the coordinates in the extra coords. """
@property @abc.abstractmethod def mapping(self) -> Iterable[Tuple[int, int]]: """ The mapping between the array dimensions and pixel dimensions. This is an iterable of ``(array_dimension, pixel_dimension)`` pairs of length equal to the number of pixel dimensions in the extra coords. """ @property @abc.abstractmethod def wcs(self) -> BaseHighLevelWCS: """ A WCS object representing the world coordinates described by this ``ExtraCoords``. .. note:: This WCS object does not map to the pixel dimensions of the data array in the `~ndcube.NDCube` object. It only includes pixel dimensions associated with the extra coordinates. For example, if there is only one extra coordinate associated with a single pixel dimension, this WCS will only have 1 pixel dimension, even if the `~ndcube.NDCube` object has a data array of 2-D or greater. Therefore using this WCS directly might lead to some confusing results. """ @property @abc.abstractproperty def is_empty(self): """Return True if no extra coords present, else return False.""" @abc.abstractmethod def __getitem__(self, item: Union[str, int, slice, Iterable[Union[str, int, slice]]]) -> "ExtraCoordsABC": """ ExtraCoords can be sliced with either a string, or a numpy like slice. When sliced with a string it should return a new ExtraCoords object with only those coordinates with the given names. When sliced with a numpy array like slice it should return a new ExtraCoords with the slice applied. Supporting step is not required and "fancy indexing" is not supported. """
[docs] class ExtraCoords(ExtraCoordsABC): """ A representation of additional world coordinates associated with pixel axes. ExtraCoords can be initialised by either specifying a `~astropy.wcs.wcsapi.BaseLowLevelWCS` object and a ``mapping``, or it can be built up by specifying one or more lookup tables. Parameters ---------- wcs The WCS specifying the extra coordinates. mapping The mapping between the array dimensions and pixel dimensions in the extra coords object. This is an iterable of ``(array_dimension, pixel_dimension)`` pairs of length equal to the number of pixel dimensions in the extra coords. """ def __init__(self, ndcube=None): super().__init__() # Setup private attributes self._wcs = None self._mapping = None # Lookup tables is a list of (pixel_dim, LookupTableCoord) to allow for # one pixel dimension having more than one lookup coord. self._lookup_tables = list() self._dropped_tables = list() # We need a reference to the parent NDCube self._ndcube = ndcube
[docs] @classmethod def from_lookup_tables(cls, names, pixel_dimensions, lookup_tables, physical_types=None): """ Construct a new ExtraCoords instance from lookup tables. This is a convenience wrapper around `ndcube.ExtraCoords.add` which does not expose all the options available in that method. Parameters ---------- names : `tuple` of `str` The names of the world coordinates. pixel_dimensions : `tuple` of `int` The pixel dimensions (in the array) to which the ``lookup_tables`` apply. Must be the same length as ``lookup_tables``. lookup_tables : iterable of `object` The lookup tables which specify the world coordinates for the ``pixel_dimensions``. Must be `~ndcube.extra_coords.BaseTableCoordinate` subclass instances or objects from which to instantiate them (see `ndcube.ExtraCoords.add`). physical_types: sequence of `str` or of sequences of `str`, optional Descriptors of the :ref:`dimensions` associated with each axis in the tables. Must be the same length as ``lookup_tables``; and length of each element must match the number of dimensions in corresponding ``lookup_tables[i]``. Defaults to `None`. Returns ------- `ndcube.ExtraCoords` """ if len(pixel_dimensions) != len(lookup_tables): raise ValueError( "The length of pixel_dimensions and lookup_tables must match." ) if physical_types is None: physical_types = len(lookup_tables) * [physical_types] elif len(physical_types) != len(lookup_tables): raise ValueError("The number of physical types and lookup_tables must match.") extra_coords = cls() for name, pixel_dim, lookup_table, physical_type in zip(names, pixel_dimensions, lookup_tables, physical_types): extra_coords.add(name, pixel_dim, lookup_table, physical_types=physical_type) return extra_coords
[docs] def add(self, name, array_dimension, lookup_table, physical_types=None, **kwargs): # docstring in ABC if self._wcs is not None: raise ValueError( "Can not add a lookup_table to an ExtraCoords which was instantiated with a WCS object." ) kwargs['names'] = [name] if not isinstance(name, (list, tuple)) else name if isinstance(lookup_table, BaseTableCoordinate): coord = lookup_table elif isinstance(lookup_table, Time): coord = TimeTableCoordinate(lookup_table, physical_types=physical_types, **kwargs) elif isinstance(lookup_table, SkyCoord): coord = SkyCoordTableCoordinate(lookup_table, physical_types=physical_types, **kwargs) elif isinstance(lookup_table, (list, tuple)): coord = QuantityTableCoordinate(*lookup_table, physical_types=physical_types, **kwargs) elif isinstance(lookup_table, u.Quantity): coord = QuantityTableCoordinate(lookup_table, physical_types=physical_types, **kwargs) else: raise TypeError(f"The input type {type(lookup_table)} isn't supported") self._lookup_tables.append((array_dimension, coord)) # Sort the LUTs so that the mapping and the wcs are ordered in pixel dim order self._lookup_tables = list(sorted(self._lookup_tables, key=lambda x: x[0] if isinstance(x[0], Integral) else x[0][0]))
@property def _name_lut_map(self): """ Map of world names to the corresponding `.LookupTableCoord` """ return {lut[1].wcs.world_axis_names: lut for lut in self._lookup_tables}
[docs] def keys(self): # docstring in ABC if not self.wcs: return tuple() return tuple(self.wcs.world_axis_names) if self.wcs.world_axis_names else None
@property def mapping(self): # docstring in ABC if self._mapping: return self._mapping # If mapping is not set but lookup_tables is empty then the extra # coords is empty, so there is no mapping. if not self._lookup_tables: return tuple() # The mapping is from the array index (position in the list) to the # pixel dimensions (numbers in the list) lts = [list([lt[0]] if isinstance(lt[0], Integral) else lt[0]) for lt in self._lookup_tables] converter = partial(convert_between_array_and_pixel_axes, naxes=len(self._ndcube.dimensions)) pixel_indicies = [list(converter(np.array(ids))) for ids in lts] return tuple(reduce(list.__add__, pixel_indicies)) @mapping.setter def mapping(self, mapping): if self._mapping is not None: raise AttributeError("Can't set mapping if a mapping has already been specified.") if self._lookup_tables: raise AttributeError( "Can't set mapping manually when ExtraCoords is built from lookup tables." ) if self._wcs is not None: if not max(mapping) <= self._wcs.pixel_n_dim - 1: raise ValueError( "Values in the mapping can not be larger than the number of pixel dimensions in the WCS." ) self._mapping = mapping @property def wcs(self): # docstring in ABC if self._wcs is not None: return self._wcs if not self._lookup_tables: return None tcoords = set(lt[1] for lt in self._lookup_tables) # created a sorted list of unique items _tmp = set() # a temporary set tcoords = [x[1] for x in self._lookup_tables if x[1] not in _tmp and _tmp.add(x[1]) is None] return MultipleTableCoordinate(*tcoords).wcs @wcs.setter def wcs(self, wcs): if self._wcs is not None: raise AttributeError( "Can't set wcs if a WCS has already been specified." ) if self._lookup_tables: raise AttributeError( "Can't set wcs manually when ExtraCoords is built from lookup tables." ) if self._mapping is not None: if not max(self._mapping) <= wcs.pixel_n_dim - 1: raise ValueError( "Values in the mapping can not be larger than the number of pixel dimensions in the WCS." ) self._wcs = wcs @property def is_empty(self): # docstring in ABC if not self._wcs and not self._lookup_tables: return True else: return False def _getitem_string(self, item): """ Slice the Extracoords based on axis names. """ for names, lut in self._name_lut_map.items(): if item in names: new_ec = ExtraCoords(ndcube=self._ndcube) new_ec._lookup_tables = [lut] return new_ec raise KeyError(f"Can't find the world axis named {item} in this ExtraCoords object.") def _getitem_lookup_tables(self, item): """ Apply an array slice to the lookup tables. Returns a new ExtraCoords object with modified lookup tables. """ dropped_tables = set() new_lookup_tables = set() ndims = max([lut[0] if isinstance(lut[0], Integral) else max(lut[0]) for lut in self._lookup_tables]) + 1 # Determine how many dimensions will be dropped by slicing below each dimension. if isinstance(item, Integral): n_dropped_dims = np.ones(ndims, dtype=int) item = tuple([item] + [slice(None)] * (ndims - 1)) elif isinstance(item, slice): n_dropped_dims = np.zeros(ndims, dtype=int) item = tuple([item] + [slice(None)] * (ndims - 1)) else: item = list(item) + [slice(None)] * (ndims - len(item)) n_dropped_dims = np.cumsum([isinstance(i, Integral) for i in item]) for lut_axis, lut in self._lookup_tables: lut_axes = (lut_axis,) if not isinstance(lut_axis, tuple) else lut_axis new_lut_axes = tuple(ax - n_dropped_dims[ax] for ax in lut_axes) lut_slice = tuple(item[i] for i in lut_axes) if isinstance(lut_slice, tuple) and len(lut_slice) == 1: lut_slice = lut_slice[0] sliced_lut = lut[lut_slice] if sliced_lut.is_scalar(): dropped_tables.add(sliced_lut) else: new_lookup_tables.add((new_lut_axes, sliced_lut)) new_extra_coords = type(self)() new_extra_coords._lookup_tables = list(new_lookup_tables) new_extra_coords._dropped_tables = list(dropped_tables) return new_extra_coords def _getitem_wcs(self, item): item = sanitize_slices(item, self.wcs.pixel_n_dim) # It's valid to slice down the EC such that there is nothing left, # which is not a valid way to slice the WCS if len(item) == self.wcs.pixel_n_dim and all(isinstance(i, Integral) for i in item): return type(self)() subwcs = self.wcs[item] new_mapping = [self.mapping[i] for i, subitem in enumerate(item) if not isinstance(subitem, Integral)] new_ec = type(self)() new_ec.wcs = subwcs new_ec.mapping = new_mapping return new_ec def __getitem__(self, item): # docstring in ABC if isinstance(item, str): return self._getitem_string(item) if self._wcs: return self._getitem_wcs(item) elif self._lookup_tables: return self._getitem_lookup_tables(item) # If we get here this object is empty, so just return an empty extra coords # This is done to simplify the slicing in NDCube return self @property def dropped_world_dimensions(self): """ Return an APE-14 like representation of any sliced out world dimensions. """ if self._wcs: if isinstance(self._wcs, SlicedLowLevelWCS): return self._wcs.dropped_world_dimensions if self._lookup_tables or self._dropped_tables: mtc = MultipleTableCoordinate(*[lt[1] for lt in self._lookup_tables]) mtc._dropped_coords = self._dropped_tables return mtc.dropped_world_dimensions return dict()
[docs] def resample(self, factor, offset=0, ndcube=None, **kwargs): """ Resample all extra coords by given factors in array-index-space. One resample factor must be supplied for each array axis in array-axis order. Parameters ---------- factor: `int`, `float`, or iterable thereof. The factor by which each array axis is resampled. If scalar, same factor is applied to all axes. Otherwise a factor for each axis must be provided. offset: `int`, `float`, or iterable therefore. The location on the underlying grid which corresponds to the zeroth element after resampling. If iterable, must have an entry for each dimension. If a scalar, the grid will be shifted by the same amount in all dimensions. ndcube: `~ndcube.NDCube` The NDCube instance with which the output ExtraCoords object is associated. kwargs All remaining kwargs are passed to `numpy.interp`. Returns ------- new_ec: `~ndcube.ExtraCoords` A new ExtraCoords object holding the interpolated coords. """ new_ec = type(self)(ndcube) if self.is_empty: return new_ec if self._ndcube is not None: cube_shape = self._ndcube.data.shape ndim = len(cube_shape) elif self._wcs is not None: ndim = self._wcs.pixel_n_dim else: raise NotImplementedError( "Resampling a lookup-table-based ExtraCoords not yet implemented. " "Please raise an issue at https://github.com/sunpy/ndcube/issues " "if you need this functionality") if np.isscalar(factor): factor = [factor] * ndim if len(factor) != ndim: raise ValueError( "factor must be scalar or an iterable with length equal to number of cube " f"dimensions: len(factor) = {len(factor)}; No. cube dimensions = {ndim}.") if np.isscalar(offset): offset = [offset] * ndim if len(offset) != ndim: raise ValueError( "offset must be scalar or an iterable with length equal to number of cube " f"dimensions: len(offset) = {len(offset)}; No. cube dimensions = {ndim}.") # If ExtraCoords object built on WCS, resample using WCS insfrastructure if self._wcs is not None: new_ec.wcs = HighLevelWCSWrapper(ResampledLowLevelWCS(self._wcs.low_level_wcs, factor, offset)) return new_ec # Else interpolate the lookup table coordinates. factor = np.asarray(factor) new_grids = [] for c, d, f in zip(offset, cube_shape, factor): x = np.arange(c, d+f, f) x = x[x <= d-1] new_grids.append(x) new_grids = np.array(new_grids, dtype=object) for array_axes, coord in self._lookup_tables: if np.isscalar(array_axes): new_coord = coord.interpolate(new_grids[array_axes], **kwargs) else: new_coord = coord.interpolate(*new_grids[np.asarray(array_axes)], **kwargs) new_ec.add(coord.names, array_axes, new_coord, physical_types=coord.physical_types) return new_ec
@property def cube_wcs(self): """Produce a WCS that describes the associated NDCube with just the extra coords. For NDCube pixel axes without any extra coord, dummy axes are inserted. """ wcses = [self.wcs] mapping = list(self.mapping) dummy_axes = self._cube_array_axes_without_extra_coords n_dummy_axes = len(dummy_axes) if n_dummy_axes > 0: dummy_wcs = WCS(naxis=n_dummy_axes) dummy_wcs.wcs.crpix = [1] * n_dummy_axes dummy_wcs.wcs.cdelt = [1] * n_dummy_axes dummy_wcs.wcs.crval = [0] * n_dummy_axes dummy_wcs.wcs.ctype = ["PIXEL"] * n_dummy_axes dummy_wcs.wcs.cunit = ["pix"] * n_dummy_axes wcses.append(dummy_wcs) mapping += list(dummy_axes) return CompoundLowLevelWCS(*wcses, mapping=mapping) @property def _cube_array_axes_without_extra_coords(self): """Return the array axes not associated with any extra coord.""" return set(range(len(self._ndcube.dimensions))) - set(self.mapping) def __str__(self): classname = self.__class__.__name__ elements = [f"{', '.join(table.names)} ({axes}) {table.physical_types}: {table}" for axes, table in self._lookup_tables] length = len(classname) + 2 * len(elements) + sum(len(e) for e in elements) if length > np.get_printoptions()['linewidth']: joiner = ',\n ' + len(classname) * ' ' else: joiner = ', ' return f"{classname}({joiner.join(elements)})" def __repr__(self): return f"{object.__repr__(self)}\n{self}"