Source code for sunpy.map.map_factory

import os
import pathlib
from collections import OrderedDict
from urllib.request import Request

import numpy as np

import astropy.io.fits
from astropy.utils.decorators import deprecated_renamed_argument
from astropy.utils.introspection import minversion
from astropy.wcs import WCS

from sunpy import log
from sunpy.data import cache
from sunpy.io._file_tools import detect_filetype, read_file
from sunpy.io._header import FileHeader
from sunpy.map.compositemap import CompositeMap
from sunpy.map.mapbase import GenericMap, MapMetaValidationError
from sunpy.map.mapsequence import MapSequence
from sunpy.util import expand_list
from sunpy.util.datatype_factory_base import (
    BasicRegistrationFactory,
    MultipleMatchError,
    NoMatchError,
    ValidationFunctionError,
)
from sunpy.util.exceptions import NoMapsInFileError, SunpyDeprecationWarning, warn_user
from sunpy.util.functools import seconddispatch
from sunpy.util.io import is_url, parse_path, possibly_a_path
from sunpy.util.metadata import MetaDict

SUPPORTED_ARRAY_TYPES = (np.ndarray,)
try:
    import dask.array
    SUPPORTED_ARRAY_TYPES += (dask.array.Array,)
except ImportError:
    pass

__all__ = ['Map', 'MapFactory']


[docs] class MapFactory(BasicRegistrationFactory): """ A factory for generating coordinate aware 2D images. This factory takes a variety of inputs, such as file paths, wildcard patterns or (data, header) pairs. Depending on the input different return types are possible. Parameters ---------- \\*inputs Inputs to parse for map objects. See the examples section for a detailed list of accepted inputs. sequence : `bool`, optional Return a `sunpy.map.MapSequence` object comprised of all the parsed maps. composite : `bool`, optional Return a `sunpy.map.CompositeMap` object comprised of all the parsed maps. Returns ------- `sunpy.map.GenericMap` If the input results in a singular map object, then that is returned. `list` of `~sunpy.map.GenericMap` If multiple inputs are given and ``sequence=False`` and ``composite=False`` (the default) then a list of `~sunpy.map.GenericMap` objects will be returned. `sunpy.map.MapSequence` If the input corresponds to multiple maps and ``sequence=True`` is set, then a `~sunpy.map.MapSequence` object is returned. `sunpy.map.CompositeMap` If the input corresponds to multiple maps and ``composite=True`` is set, then a `~sunpy.map.CompositeMap` object is returned. Examples -------- >>> import sunpy.map >>> from astropy.io import fits >>> import sunpy.data.sample # doctest: +REMOTE_DATA >>> mymap = sunpy.map.Map(sunpy.data.sample.AIA_171_IMAGE) # doctest: +REMOTE_DATA """ def _read_file(self, fname, **kwargs): """ Read in a file name and return the list of (data, meta) pairs in that file. """ # File gets read here. This needs to be generic enough to seamlessly # call a fits file or a jpeg2k file, etc # NOTE: use os.fspath so that fname can be either a str or pathlib.Path # This can be removed once read_file supports pathlib.Path log.debug(f'Reading {fname}') try: filetype = detect_filetype(fname) if filetype == "asdf": import asdf if minversion(asdf, "3.1.0"): _NO_MEMMAP_KWARGS = {"memmap": False, "lazy_load": False} else: _NO_MEMMAP_KWARGS = {"copy_arrays": True, "lazy_load": False} with asdf.open(fname,** _NO_MEMMAP_KWARGS) as af: pairs = [value for value in af.tree.values() if isinstance(value, GenericMap)] return pairs else: pairs = read_file(os.fspath(fname), filetype=filetype, **kwargs) except Exception as e: msg = f"Failed to read {fname}\n{e}" if kwargs.get("silence_errors") or kwargs.get("allow_errors"): warn_user(msg) return [] msg += "\n If you want to bypass these errors, pass `allow_errors=True`." raise OSError(msg) from e new_pairs = [] for pair in pairs: filedata, filemeta = pair assert isinstance(filemeta, FileHeader) # This tests that the data is more than 1D if len(np.shape(filedata)) > 1: data = filedata meta = MetaDict(filemeta) new_pairs.append((data, meta)) if not new_pairs: raise NoMapsInFileError(f"Found no HDUs with >= 2D data in '{fname}'.") return new_pairs def _validate_meta(self, meta): """ Validate a meta argument. """ if isinstance(meta, astropy.io.fits.header.Header): return True elif isinstance(meta, dict): return True else: return False def _parse_args(self, *args, silence_errors=False, allow_errors=False, **kwargs): """ Parses an args list into data-header pairs. args can contain any mixture of the following entries: * tuples of data,header * data, header not in a tuple * data, wcs object in a tuple * data, wcs object not in a tuple * filename, as a str or pathlib.Path, which will be read * directory, as a str or pathlib.Path, from which all files will be read * glob, from which all files will be read * url, which will be downloaded and read * lists containing any of the above. Examples -------- self._parse_args(data, header, (data, header), ['file1', 'file2', 'file3'], 'file4', 'directory1', '*.fits') """ # Account for nested lists of items args = expand_list(args) # Sanitise the input so that each 'type' of input corresponds to a different # class, so single dispatch can be used later nargs = len(args) i = 0 while i < nargs: arg = args[i] if isinstance(arg, SUPPORTED_ARRAY_TYPES): # The next two items are data and a header data = args.pop(i) header = args.pop(i) args.insert(i, (data, header)) nargs -= 1 elif isinstance(arg, str) and is_url(arg): # Replace URL string with a Request object to dispatch on later args[i] = Request(arg) elif possibly_a_path(arg): # Replace path strings with Path objects args[i] = pathlib.Path(arg) i += 1 # Parse the arguments # Note that this list can also contain GenericMaps if they are directly given to the factory data_header_pairs = [] for arg in args: try: data_header_pairs += self._parse_arg(arg, silence_errors=silence_errors, allow_errors=allow_errors, **kwargs) except NoMapsInFileError as e: if not (silence_errors or allow_errors): raise warn_user(f"One of the arguments failed to parse with error: {e}") return data_header_pairs # Note that post python 3.8 this can be @functools.singledispatchmethod @seconddispatch def _parse_arg(self, arg, **kwargs): """ Take a factory input and parse into (data, header) pairs. Must return a list, even if only one pair is returned. """ raise ValueError(f"Invalid input: {arg}") @_parse_arg.register(tuple) def _parse_tuple(self, arg, **kwargs): # Data-header or data-WCS pair data, header = arg if isinstance(header, WCS): header = header.to_header() pair = data, header if self._validate_meta(header): pair = (data, OrderedDict(header)) return [pair] @_parse_arg.register(GenericMap) def _parse_map(self, arg, **kwargs): return [arg] @_parse_arg.register(Request) def _parse_url(self, arg, **kwargs): url = arg.full_url path = str(cache.download(url).absolute()) pairs = self._read_file(path, **kwargs) return pairs @_parse_arg.register(pathlib.Path) def _parse_path(self, arg, **kwargs): return parse_path(arg, self._read_file, **kwargs) @deprecated_renamed_argument("silence_errors", "allow_errors", "5.1", warning_type=SunpyDeprecationWarning) def __call__(self, *args, composite=False, sequence=False, silence_errors=False, allow_errors=False, **kwargs): """ Method for running the factory. Takes arbitrary arguments and keyword arguments and passes them to a sequence of pre-registered types to determine which is the correct Map-type to build. Arguments args and kwargs are passed through to the validation function and to the constructor for the final type. For Map types, validation function must take a data-header pair as an argument. Parameters ---------- composite : `bool`, optional Indicates if collection of maps should be returned as a `~sunpy.map.CompositeMap`. Default is `False`. sequence : `bool`, optional Indicates if collection of maps should be returned as a `sunpy.map.MapSequence`. Default is `False`. silence_errors : `bool`, optional Deprecated, renamed to `allow_errors`. If set, ignore data-header pairs which cause an exception. Default is ``False``. allow_errors : `bool`, optional If set, bypass data-header pairs or files which cause an exception and warn instead. Defaults to `False`. Notes ----- Extra keyword arguments are passed through to `sunpy.io._file_tools.read_file` such as ``memmap`` for FITS files. """ data_header_pairs = self._parse_args(*args, silence_errors=silence_errors, allow_errors=allow_errors, **kwargs) new_maps = list() # Loop over each registered type and check to see if WidgetType # matches the arguments. If it does, use that type. for pair in data_header_pairs: if isinstance(pair, GenericMap): new_maps.append(pair) continue data, header = pair meta = MetaDict(header) try: new_map = self._check_registered_widgets(data, meta, **kwargs) new_maps.append(new_map) except (NoMatchError, MultipleMatchError, ValidationFunctionError, MapMetaValidationError) as e: if not (silence_errors or allow_errors): raise warn_user(f"One of the data, header pairs failed to validate with: {e}") if not len(new_maps): raise RuntimeError('No maps loaded') # If the list is meant to be a sequence, instantiate a map sequence if sequence: return MapSequence(new_maps, **kwargs) # If the list is meant to be a composite map, instantiate one if composite: return CompositeMap(new_maps, **kwargs) if len(new_maps) == 1: return new_maps[0] return new_maps def _check_registered_widgets(self, data, meta, **kwargs): candidate_widget_types = list() for key in self.registry: # Call the registered validation function for each registered class if self.registry[key](data, meta, **kwargs): candidate_widget_types.append(key) n_matches = len(candidate_widget_types) if n_matches == 0: if self.default_widget_type is None: raise NoMatchError("No types match specified arguments and no default is set.") else: candidate_widget_types = [self.default_widget_type] elif n_matches > 1: raise MultipleMatchError("Too many candidate types identified " f"({candidate_widget_types}). " "Specify enough keywords to guarantee unique type " "identification.") # Only one is found WidgetType = candidate_widget_types[0] return WidgetType(data, meta, **kwargs)
class InvalidMapInput(ValueError): """Exception to raise when input variable is not a Map instance and does not point to a valid Map input file.""" class InvalidMapType(ValueError): """Exception to raise when an invalid type of map is requested with Map """ class NoMapsFound(ValueError): """Exception to raise when input does not point to any valid maps or files """ Map = MapFactory(registry=GenericMap._registry, default_widget_type=GenericMap, additional_validation_functions=['is_datasource_for'])