Source code for sunkit_instruments.suvi.io

import gzip
import logging
import os
import tempfile
from pathlib import Path

import h5py
import numpy
import sunpy.map
from astropy import units as u
from astropy.io import fits
from astropy.time import Time
from sunpy.util.exceptions import warn_user

from sunkit_instruments.suvi._variables import (
    COMPOSITE_MATCHES,
    FITS_FILE_EXTENSIONS,
    L1B_MATCHES,
    NETCDF_FILE_EXTENSIONS,
    TAG_COMMENT_MAPPING,
    TAG_MAPPING,
)

__all__ = ["read_suvi", "files_to_map"]


def _fix_l1b_header(filename):
    """
    Fix a SUVI L1b FITS file header (broken due to the wrong
    implementation of the CONTINUE keyword convention).

    .. note::
        astropy versions <=4.2.0 will do this faster, because we can
        still use the `astropy.io.fits.header.Header.to_string()` method.
        Starting with astropy version 4.2.1, the
        `astropy.io.fits.header.Header.to_string()` method will not work
        anymore due to FITS header consistency checks that cannot
        be overridden. The solution for that is not very elegant
        in the code here (reading the FITS file directly as bytes
        until we hit a UnicodeDecodeError), but it is the only one
        that works as far as I know.

    .. note::
        If the input file it gzipped, an unzipped file in the default
        tmp directory will be used and deleted afterwards.

    Parameters
    ----------
    filename: `str`
        Filename of the L1b file with the corrupt FITS header.

    Returns
    -------
    `astropy.io.fits.header.Header`
        Corrected FITS header.
    """
    try:
        # First try it with the astropy .to_string() method, as this is the easiest.
        hdr = fits.getheader(filename)
        hdr_str = hdr.tostring()
    except Exception:
        # Read the file manually as bytes until we hit a UnicodeDecodeError, i.e.
        # until we reach the data part. Since astropy version 4.2.1, we can't use
        # the .to_string() method anymore because of FITS header consistency checks
        # that cannot be overridden, and they won't fix it unfortunately. If the
        # input file is a .gz file, we need to unpack it first to the tmp directory.
        temp_dir = tempfile.gettempdir()
        name = Path(filename).name
        is_gz_file = False
        if name.endswith(".gz"):
            is_gz_file = True
            with gzip.open(filename, "r") as gfile:
                filename = str(Path(temp_dir) / name[:-3])
                with open(filename, "wb") as file_out:
                    file_out.write(gfile.read())
        hdr_str = ""
        with open(filename, "rb") as file:
            counter = 1
            while True:
                try:
                    this_line = file.read(counter)
                    this_str = this_line.decode("utf-8")
                    hdr_str += this_str
                    counter += 1
                except UnicodeDecodeError:
                    break
        if is_gz_file:
            os.remove(filename)
    # Make a list of strings with a length of 80
    hdr_list = [hdr_str[i : i + 80] for i in range(0, len(hdr_str), 80)]
    # Remove all the empty entries
    while " " * 80 in hdr_list:
        hdr_list.remove(" " * 80)
    hdr_list_new = []
    for count, item in enumerate(hdr_list):
        if count <= len(hdr_list) - 2:
            if (
                hdr_list[count][0:8] != "CONTINUE"
                and hdr_list[count + 1][0:8] != "CONTINUE"
            ):
                hdr_list_new.append(hdr_list[count])
            else:
                if (
                    hdr_list[count][0:8] != "CONTINUE"
                    and hdr_list[count + 1][0:8] == "CONTINUE"
                ):
                    ampersand_pos = hdr_list[count].find("&")
                    if ampersand_pos != -1:
                        new_entry = hdr_list[count][0:ampersand_pos]
                    else:
                        raise RuntimeError(
                            "There should be an ampersand at the end of a CONTINUE'd keyword."
                        )
                    tmp_count = 1
                    while hdr_list[count + tmp_count][0:8] == "CONTINUE":
                        ampersand_pos = hdr_list[count + tmp_count].find("&")
                        if ampersand_pos != -1:
                            first_sq_pos = hdr_list[count + tmp_count].find("'")
                            if first_sq_pos != -1:
                                new_entry = (
                                    new_entry
                                    + hdr_list[count + tmp_count][
                                        first_sq_pos + 1 : ampersand_pos
                                    ]
                                )
                            else:
                                raise RuntimeError(
                                    "There should be two single quotes after CONTINUE. Did not find any."
                                )
                        else:
                            # If there is no ampersand at the end anymore, it means the entry ends here.
                            # Read from the first to the second single quote in this case.
                            first_sq_pos = hdr_list[count + tmp_count].find("'")
                            if first_sq_pos != -1:
                                second_sq_pos = hdr_list[count + tmp_count][
                                    first_sq_pos + 1 :
                                ].find("'")
                                if second_sq_pos != -1:
                                    new_entry = (
                                        new_entry
                                        + hdr_list[count + tmp_count][
                                            first_sq_pos
                                            + 1 : second_sq_pos
                                            + 1
                                            + first_sq_pos
                                        ].rstrip()
                                        + "'"
                                    )
                                else:
                                    raise RuntimeError(
                                        "There should be two single quotes after CONTINUE. Found the first, but not the second."
                                    )
                            else:
                                raise RuntimeError(
                                    "There should be two single quotes after CONTINUE. Did not find any."
                                )
                        tmp_count += 1
                    hdr_list_new.append(new_entry)
                else:
                    continue
        else:
            # Add END at the end of the header
            hdr_list_new.append(hdr_list[count])
    # Now we stitch together the CONTINUE information correctly,
    # with a "\n" at the end that we use as a separator later on
    # when we convert from a string to an astropy header.
    for count, item in enumerate(hdr_list_new):
        if len(item) > 80:
            this_entry = item[0:78] + "&'\n"
            rest = "CONTINUE  '" + item[78:]
            while len(rest) > 80:
                this_entry = this_entry + rest[0:78] + "&'\n"
                rest = "CONTINUE  '" + rest[78:]
            this_entry = this_entry + rest
            hdr_list_new[count] = this_entry
    # Now we should have the correct list of strings. Since we can't convert a list to a
    # FITS header directly, we have to convert it to a string first, separated by "\n".
    hdr_str_new = "\n".join([str(item) for item in hdr_list_new])
    hdr_corr = fits.Header.fromstring(hdr_str_new, sep="\n")
    return hdr_corr


def _read_fits(filename):
    """
    Read a FITS file and return the header, data and dqf.
    """
    if any(fn in os.path.basename(filename) for fn in COMPOSITE_MATCHES):
        with fits.open(filename) as hdu:
            data, header = hdu[1].data, hdu[1].header
            dqf = None
    elif any(fn in os.path.basename(filename) for fn in L1B_MATCHES):
        with fits.open(filename) as hdu:
            data, header, dqf = hdu[0].data, _fix_l1b_header(filename), hdu[1].data
    else:
        raise ValueError(
            f"File {filename} does not look like a SUVI L1b FITS file or L2 HDR composite."
        )
    return header, data, dqf


def _make_cdf_header(header_info):
    header_info_copy = header_info.copy()
    # Discard everything where the key name is longer than 8 characters,
    # plus specific entries we have to deal with manually.
    for key, value in header_info.items():
        if len(key) > 8:
            del header_info_copy[key]
        elif key in ["RAD", "DQF", "NAXIS1", "NAXIS2"]:
            del header_info_copy[key]
    for key, value in header_info_copy.items():
        if isinstance(value, numpy.ndarray):
            # We only want single values for the header, no arrays of length 1.
            # We convert everything that looks like an integer to a long,
            # everything that looks like a float to float64, and byte strings
            # to actual strings.
            if value.ndim == 0:
                if value.dtype in [
                    numpy.int8,
                    numpy.int16,
                    numpy.int32,
                    numpy.int64,
                    numpy.uint8,
                    numpy.uint16,
                    numpy.uint32,
                    numpy.uint64,
                ]:
                    header_info_copy[key] = numpy.longlong(value)
                elif value.dtype in [numpy.float16, numpy.float32, numpy.float64]:
                    header_info_copy[key] = numpy.float64(value)
            else:
                if value.dtype == "|S1":
                    # Byte string to actual string, and removing weird characters
                    header_info_copy[key] = (
                        value.tobytes().decode("utf-8").rstrip("\x00")
                    )
    # Now deal with the dates (float in the netCDF). Transform to readable string,
    # ignore bakeout date because it is always -999.
    for key, value in header_info_copy.items():
        if key.startswith("DATE") and key != "DATE-BKE":
            # Explanation for the odd time creation: the SUVI files say they use the
            # the J2000 epoch, but they do not: the reference time is 2000-01-01 at
            # 12:00:00 *UTC*, whereas the reference time for J2000 is in *TT*. So in
            # order to get the time right, we need to define it in TT, but add the
            # offset of 69.184 seconds between UTC and TT.
            the_readable_date = (
                Time("2000-01-01T12:01:09.184", scale="tt") + value * u.s
            )
            header_info_copy[key] = the_readable_date.utc.value
    # Add NAXIS1 and NAXIS2 manually, because they are odd coming from the netCDF
    header_info_copy["NAXIS1"] = None
    header_info_copy["NAXIS2"] = None
    # Same for BLANK, BSCALE, and BZERO
    header_info_copy["BLANK"] = None
    header_info_copy["BSCALE"] = None
    header_info_copy["BZERO"] = None
    header_info_copy["BUNIT"] = None
    header = fits.Header.fromkeys(header_info_copy.keys())
    for keyword in header:
        header[keyword] = header_info_copy[keyword]
    # Add fits header comments for known keywords as defined above
    for keyword in header:
        if keyword in TAG_COMMENT_MAPPING:
            header.set(keyword, header[keyword], TAG_COMMENT_MAPPING[keyword])
    # Add EXTEND, EXTVER, EXTNAME, and LONGSTR
    header.append(("EXTEND", True, "FITS dataet may contain extensions"))
    header.append(("EXTVER", 1, ""))
    header.append(("EXTNAME", "DATA", ""))
    header.append(
        ("LONGSTRN", "OGIP 1.0", "The HEASARC Long String Convention may be used")
    )
    return header


def _read_netCDF(filename):
    """
    Read a CDF file and return the header, data and dqf.
    """
    if any(fn in os.path.basename(filename) for fn in L1B_MATCHES):
        with h5py.File(filename, "r") as afile:
            data = afile["RAD"][:]

            blank = afile["RAD"].attrs["_FillValue"][0]
            bzero = afile["RAD"].attrs["add_offset"][0]
            bscale = afile["RAD"].attrs["scale_factor"][0]
            bunit = afile["RAD"].attrs["units"].tobytes().decode("utf-8").rstrip("\x00")

            data = data * bscale + bzero
            dqf = afile["DQF"][:]

            header_info = dict((key, afile[key][...]) for key in afile.keys())
            header = _make_cdf_header(header_info)
            # Deal with this here as we require the file.
            for att, val in afile.attrs.items():
                if att in TAG_MAPPING:
                    header[TAG_MAPPING[att]] = (
                        val.tobytes().decode("utf-8").rstrip("\x00")
                    )
            header["NAXIS1"] = data.shape[0]
            header["NAXIS2"] = data.shape[1]
            header["BLANK"] = blank
            header["BSCALE"] = bscale
            header["BZERO"] = bzero
            header["BUNIT"] = bunit
    else:
        raise ValueError(f"File {filename} does not look like a SUVI L1b netCDF file.")
    return header, data, dqf


[docs] def read_suvi(filename): """ Read a SUVI L1b FITS or netCDF file or a L2 HDR composite FITS file. Returns header, data and the data quality flag array (DQF) for L1b files. For SUVI L1b FITS files, the broken FITS header is fixed automatically (broken because of the wrong implementation of the CONTINUE convention). This read function is intended to provide a consistent file interface for FITS and netCDF, L1b and L2. .. note:: The type of file is determined by pattern matching in the filenames, e.g. "-L1b-Fe171" for a 171 L1b file and "-l2-ci171" for a 171 L2 HDR composite. If those patterns are not found in the filename, the files will not be recognized. .. note:: If ``filename`` is an L1b netCDF file, the information from the netCDF file is transformed into a FITS header. Parameters ---------- filename : `str` File to read. Returns ------- `astropy.io.fits.header.Header`, `~numpy.ndarray`, `~numpy.ndarray` Header, data, and data quality flags. """ if filename.lower().endswith(FITS_FILE_EXTENSIONS): header, data, dqf = _read_fits(filename) elif filename.lower().endswith(NETCDF_FILE_EXTENSIONS): header, data, dqf = _read_netCDF(filename) else: raise ValueError( f"File {filename} does not look like a valid FITS or netCDF file." ) return header, data, dqf
[docs] def files_to_map( files, despike_l1b=False, only_long_exposures=False, only_short_exposures=False, only_short_flare_exposures=False, ): """ Read SUVI L1b FITS or netCDF files or L2 HDR composite FITS files and return a `~sunpy.map.Map` or a `~sunpy.map.MapSequence`. For SUVI L1b FITS files, the broken FITS header is fixed automatically (broken because of the wrong implementation of the CONTINUE convention). .. note:: The first file in the (sorted, if sort_files=True) list determines what will be accepted further on, i.e. L2 HDR composites or L1b files. If L1b files are appearing in a file list that started with an L2 HDR composite, they will be rejected (and vice versa). The type of file is determined by pattern matching in the filenames, e.g. "-L1b-Fe171" for a 171 L1b file and "-l2-ci171" for a 171 L2 HDR composite. If those patterns are not found in the filename, the files will not be recognized. Parameters ---------- files: `str` or `list` of `str` File(s) to read. despike_l1b: `bool`, optional. Default: False. If True and input is L1b, data will get despiked with the standard filter_width=7. Can not be used for early SUVI files where the DQF extension is missing. only_long_exposures: `bool`, optional. Default: False. If True, only long exposure L1b files from the input list will be accepted and converted to a map. Ignored for L2 HDR composites. only_short_exposures: `bool`, optional. Default: False. If True, only short exposure L1b files from the input list will be accepted and converted to a map. Ignored for L2 HDR composites and any wavelengths other than 94 and 131 (because for everything >131, there are no observations that are labeled "short", only "long" and "short_flare"). only_short_flare_exposures: `bool`, optional. Default: False. If True, only short flare exposure L1b files from the input list will be accepted and converted to a map. Ignored for L2 HDR composites. Returns ------- `~sunpy.map.Map`, `~sunpy.map.MapSequence`, or `None`. A map (sequence) of the SUVI data, or `None` if no data was found matching the given criteria. """ # Avoid circular imports from sunkit_instruments.suvi.suvi import despike_l1b_array if isinstance(files, str): files = [files] files = sorted(files) if any(fn in os.path.basename(files[0]) for fn in COMPOSITE_MATCHES): composites = True elif any(fn in os.path.basename(files[0]) for fn in L1B_MATCHES): composites = False else: raise ValueError( f"First file {files[0]} does not look like a SUVI L1b file or L2 HDR composite." ) datas = [] headers = [] for afile in files: logging.debug(f"Reading {afile}") if composites: if any(fn in os.path.basename(afile) for fn in COMPOSITE_MATCHES): header, data, _ = read_suvi(afile) datas.append(data) headers.append(header) else: warn_user( f"File {afile} does not look like a SUVI L2 HDR composite. Skipping." ) else: if any(fn in os.path.basename(afile) for fn in L1B_MATCHES): header, data, dqf_mask = read_suvi(afile) if despike_l1b: data = despike_l1b_array(data, dqf_mask) if only_long_exposures: if "long_exposure" in header["SCI_OBJ"]: datas.append(data) headers.append(header) elif only_short_exposures: if "short_exposure" in header["SCI_OBJ"]: datas.append(data) headers.append(header) elif only_short_flare_exposures: if "short_flare_exposure" in header["SCI_OBJ"]: datas.append(data) headers.append(header) else: datas.append(data) headers.append(header) else: warn_user(f"File {afile} does not look like a SUVI L1b file. Skipping.") if len(datas) == 1: return sunpy.map.Map(datas[0], headers[0]) elif len(datas) > 1: return sunpy.map.Map(list(zip(datas, headers)), sequence=True) else: warn_user("List of data/headers is empty.")