Source code for sunpy.io._fits

"""
This module provides a FITS file reader for internal use.

We instead recommend users use the `astropy.io.fits` module,
which provides more generic functionality to read FITS files.

Notes
-----
1. FITS files allow comments to be attached to every value in the header.
   This is implemented in this module as a KEYCOMMENTS dictionary in the
   sunpy header. To add a comment to the file on write, add a comment to this
   dictionary with the same name as a key in the header (upcased).

2. Due to the way `~astropy.io.fits` works with images, the header dictionary may
   differ depending on whether is accessed before or after the fits[0].data
   is requested. If the header is read before the data then the original
   header will be returned. If the header is read after the data has been
   accessed then the data will have been scaled and a modified header
   reflecting these changes will be returned: BITPIX may differ and
   BSCALE and B_ZERO may be dropped in the modified version.

3. The verify('silentfix+warn') call attempts to handle violations of the FITS
   standard. For example, ``nan`` values will be converted to "nan" strings.
   Attempting to cast a `astropy.io.fits.Header` to a dictionary while it contains
   invalid header tags will result in an error so verifying it early on
   makes the header easier to work with later.
"""
import os
import re
import sys
import math
import traceback
import collections
import collections.abc

from astropy.io import fits

from sunpy.io.header import FileHeader
from sunpy.util.exceptions import warn_metadata, warn_user
from sunpy.util.io import HDPair

__all__ = ['header_to_fits', 'read', 'get_header', 'write', 'extract_waveunit', 'format_comments_and_history']


[docs] def read(filepath, hdus=None, memmap=None, **kwargs): """ Read a fits file. Parameters ---------- filepath : `str` The fits file to be read. hdus : `int` or iterable The HDU indexes to read from the file. **kwargs : `dict`, optional Passed to `astropy.io.fits.open`. Returns ------- `list` A list of (data, header) tuples Notes ----- This routine reads all the HDU's in a fits file and returns a list of the data and a FileHeader instance for each one. Also all comments in the original file are concatenated into a single "comment" key in the returned FileHeader. """ with fits.open(filepath, ignore_blank=True, memmap=memmap, **kwargs) as hdulist: if hdus is not None: if isinstance(hdus, int): hdulist = hdulist[hdus] elif isinstance(hdus, collections.abc.Iterable): hdulist = [hdulist[i] for i in hdus] hdulist = fits.hdu.HDUList(hdulist) for h in hdulist: h.verify('silentfix+warn') headers = get_header(hdulist) pairs = [] for i, (hdu, header) in enumerate(zip(hdulist, headers)): try: pairs.append(HDPair(hdu.data, header)) except (KeyError, ValueError) as e: message = f"Error when reading HDU {i}. Skipping.\n" for line in traceback.format_tb(sys.exc_info()[2]): message += line message += '\n' message += repr(e) warn_user(message) return pairs
[docs] def get_header(afile): """ Read a fits file and return just the headers for all HDU's. Parameters ---------- afile : `str` or `astropy.io.fits.HDUList` The file to be read, or HDUList to process. Returns ------- `list` A list of `sunpy.io.header.FileHeader` headers. """ if isinstance(afile, fits.HDUList): hdulist = afile close = False else: hdulist = fits.open(afile, ignore_blank=True) hdulist.verify('silentfix') close = True try: headers = [] for hdu in hdulist: headers.append(format_comments_and_history(hdu.header)) finally: if close: hdulist.close() return headers
[docs] def format_comments_and_history(input_header): """ Combine ``COMMENT`` and ``HISTORY`` cards into single entries. Extract ``KEYCOMMENTS`` into a single entry and put ``WAVEUNIT`` into its own entry. Parameters ---------- input_header : `~astropy.io.fits.Header` The header to be processed. Returns ------- `sunpy.io.header.FileHeader` """ try: comment = "\n".join(input_header['COMMENT']).strip() except KeyError: comment = "" try: history = "\n".join(input_header['HISTORY']).strip() except KeyError: history = "" header = FileHeader(input_header) header['COMMENT'] = comment header['HISTORY'] = history # Strip out KEYCOMMENTS to a dict, the hard way keydict = {} for card in input_header.cards: if card.comment != '': keydict.update({card.keyword: card.comment}) header['KEYCOMMENTS'] = keydict waveunit = extract_waveunit(header) if waveunit is not None: header['WAVEUNIT'] = waveunit return header
[docs] def write(fname, data, header, hdu_type=None, **kwargs): """ Take a data header pair and write a FITS file. Parameters ---------- fname : `str` File name, with extension. data : `numpy.ndarray` n-dimensional data array. header : `dict` A header dictionary. hdu_type : `~astropy.io.fits.hdu.base.ExtensionHDU` instance or class, optional By default, a FITS file is written with the map in its primary HDU. If a type is given, a new HDU of this type will be created. If a HDU instance is given, its data and header will be updated from the map. Then that HDU instance will be written to the file. kwargs : Additional keyword arguments are given to `~astropy.io.fits.HDUList.writeto`. """ # Copy header so the one in memory is left alone header = header.copy() fits_header = header_to_fits(header) if isinstance(fname, str): fname = os.path.expanduser(fname) fitskwargs = {'output_verify': 'fix'} fitskwargs.update(kwargs) if not hdu_type: hdu_type = fits.PrimaryHDU if isinstance(hdu_type, (fits.PrimaryHDU, fits.hdu.base.ExtensionHDU)): hdu = hdu_type # HDU already initialized # Merge `header` into HDU's header # Values in `header` take priority, including cards such as # 'SIMPLE' and 'BITPIX'. hdu.header.extend(fits_header, strip=False, update=True) # Set the HDU's data hdu.data = data else: hdu = hdu_type(data=data, header=fits_header) if not isinstance(hdu, fits.PrimaryHDU): hdul = fits.HDUList([fits.PrimaryHDU(), hdu]) else: hdul = fits.HDUList([hdu]) hdul.writeto(fname, **fitskwargs)
[docs] def header_to_fits(header): """ Convert a header dict to a `~astropy.io.fits.Header`. """ # Copy the header to avoid modifying it in place header = header.copy() # The comments need to be added to the header separately from the normal # kwargs. Find and deal with them: fits_header = fits.Header() # Check Header key_comments = header.pop('KEYCOMMENTS', False) # This first iteration separates out COMMENTS and HISTORY into their own # entries. header_items = [] for k, v in header.items(): if k.upper() in ('COMMENT', 'HV_COMMENT', 'HISTORY'): for v_line in str(v).split('\n'): header_items.append((k, v_line)) else: header_items.append((k, v)) for k, v in header_items: # Drop any keys that have non-ascii characters if not fits.Card._ascii_text_re.match(str(v)): warn_metadata(f'The meta key {k} is not valid ascii, dropping from the FITS header') continue # Drop any keys which are too long to save into FITS if len(k) > 8: warn_metadata(f"The meta key {k} is too long, dropping from the FITS header " "(maximum allowed key length is 8 characters).") continue if isinstance(v, float) and math.isnan(v): warn_metadata(f'The meta key {k} has a NaN value, which is not valid in a FITS ' 'header, dropping from the FITS header') continue if k.upper() in ('COMMENT', 'HV_COMMENT'): fits_header.add_comment(v) elif k.upper() == 'HISTORY': fits_header.add_history(v) elif isinstance(v, fits.header._HeaderCommentaryCards): if k != '': fits_header.append(fits.Card(k, str(v).split('\n'))) else: # For some horrific reason, we save a list to the wavelnth key in # sources/rhessi.py. This is the least invasive fix for that stupidity. if isinstance(v, list): v = str(v) fits_header.append(fits.Card(k, v)) if isinstance(key_comments, dict): for k, v in key_comments.items(): # Check that the Card for the comment exists before trying to write to it. if k in fits_header: fits_header.comments[k] = v elif key_comments: raise TypeError("KEYCOMMENTS must be a dictionary") return fits_header
[docs] def extract_waveunit(header): """ Attempt to read the wavelength unit from a given FITS header. Parameters ---------- header : `sunpy.io.header.FileHeader` One `~sunpy.io.header.FileHeader` instance which was created by reading a FITS file. For example, `sunpy.io._fits.get_header` returns a list of such instances. Returns ------- `str` The wavelength unit that could be found or ``None`` otherwise. Examples -------- The goal of this function is to return a string that can be used in conjunction with the astropy.units module so that the return value can be directly passed to `astropy.units.Unit`. >>> import astropy.units >>> header = {'WAVEUNIT': 'Angstrom', 'KEYCOMMENTS': {}} >>> waveunit = extract_waveunit(header) >>> if waveunit is not None: ... unit = astropy.units.Unit(waveunit) """ # algorithm: try the following procedures in the following order and return # as soon as a waveunit could be detected # 1. read header('WAVEUNIT'). If None, go to step 2. # 1.1 -9 -> 'nm' # 1.2 -10 -> 'angstrom' # 1.3 0 -> go to step 2 # 1.4 if neither of the above, return the value itself in lowercase # 2. parse waveunit_comment # 2.1 'in meters' -> 'm' # 3. parse wavelnth_comment # 3.1 "[$UNIT] ..." -> $UNIT # 3.2 "Observed wavelength ($UNIT)" -> $UNIT def parse_waveunit_comment(waveunit_comment): if waveunit_comment == 'in meters': return 'm' if 'KEYCOMMENTS' not in header: return None waveunit_comment = header['KEYCOMMENTS'].get('WAVEUNIT') wavelnth_comment = header['KEYCOMMENTS'].get('WAVELNTH') waveunit = header.get('WAVEUNIT') if waveunit is not None: metre_submultiples = { 0: parse_waveunit_comment(waveunit_comment), -1: 'dm', -2: 'cm', -3: 'mm', -6: 'um', -9: 'nm', -10: 'angstrom', -12: 'pm', -15: 'fm', -18: 'am', -21: 'zm', -24: 'ym'} waveunit = metre_submultiples.get(waveunit, str(waveunit).lower()) elif waveunit_comment is not None: waveunit = parse_waveunit_comment(waveunit_comment) elif wavelnth_comment is not None: # supported formats (where $UNIT is the unit like "nm" or "Angstrom"): # "Observed wavelength ($UNIT)" # "Wavelength of obs ($UNIT)" # "[$UNIT] ..." patterns = [ r'Observed wavelength \((\w+?)\)$', r'Wavelength of obs \((\w+?)\)$', r'^\[(\w+?)\]', ] for pattern in patterns: m = re.search(pattern, wavelnth_comment) if m is not None: waveunit = m.group(1) break if waveunit == '': return None # To fix problems associated with HMI FITS. return waveunit