Source code for sunkit_instruments.rhessi.rhessi

"""
This module provides processing routines programs to process and analyze RHESSI
data.
"""

import re
import csv

import numpy as np

import astropy.units as u
import sunpy.io
from astropy.time import Time, TimeDelta
from sunpy.coordinates import sun
from sunpy.time import TimeRange, parse_time

__all__ = [
    "parse_observing_summary_hdulist",
    "backprojection",
    "parse_observing_summary_dbase_file",
    "_build_energy_bands",
    "uncompress_countrate",
    "imagecube2map",
]


# Measured fixed grid parameters
grid_pitch = (
    4.52467,
    7.85160,
    13.5751,
    23.5542,
    40.7241,
    70.5309,
    122.164,
    211.609,
    366.646,
)
grid_orientation = (
    3.53547,
    2.75007,
    3.53569,
    2.74962,
    3.92596,
    2.35647,
    0.786083,
    0.00140674,
    1.57147,
)

lc_linecolors = (
    "black",
    "pink",
    "green",
    "blue",
    "brown",
    "red",
    "navy",
    "orange",
    "green",
)


[docs]def parse_observing_summary_dbase_file(filename): """ Parse the RHESSI observing summary database file. This file lists the name of observing summary files for specific time ranges along with other info. .. note:: This API is currently limited to providing data from whole days only. Parameters ---------- filename : `str` The filename of the obssumm dbase file. Returns ------- `dict` Return a `dict` containing the parsed data in the dbase file. Examples -------- >>> import sunkit_instruments.rhessi as rhessi >>> rhessi.parse_observing_summary_dbase_file(fname) # doctest: +SKIP References ---------- https://hesperia.gsfc.nasa.gov/ssw/hessi/doc/guides/hessi_data_access.htm#Observing%20Summary%20Data """ # An example dbase file can be found at: # https://hesperia.gsfc.nasa.gov/hessidata/dbase/hsi_obssumm_filedb_200311.txt with open(filename) as fd: reader = csv.reader(fd, delimiter=" ", skipinitialspace=True) _ = next(reader) # skip 'HESSI Filedb File:' row _ = next(reader) # skip 'Created: ...' row _ = next(reader) # skip 'Number of Files: ...' row column_names = next(reader) # ['Filename', 'Orb_st', 'Orb_end',...] obssumm_filename = [] orbit_start = [] orbit_end = [] start_time = [] end_time = [] status_flag = [] number_of_packets = [] for row in reader: obssumm_filename.append(row[0]) orbit_start.append(int(row[1])) orbit_end.append(int(row[2])) start_time.append(Time.strptime(row[3], "%d-%b-%y")) # skip time end_time.append(Time.strptime(row[5], "%d-%b-%y")) # skip time status_flag.append(int(row[7])) number_of_packets.append(int(row[8])) return { column_names[0].lower(): obssumm_filename, column_names[1].lower(): orbit_start, column_names[2].lower(): orbit_end, column_names[3].lower(): start_time, column_names[4].lower(): end_time, column_names[5].lower(): status_flag, column_names[6].lower(): number_of_packets, }
[docs]def parse_observing_summary_hdulist(hdulist): """ Parse a RHESSI observation summary file. Parameters ---------- hdulist : `list` The HDU list from the fits file. Returns ------- out : `dict` Returns a dictionary. """ header = hdulist[0].header reference_time_ut = parse_time(hdulist[5].data.field("UT_REF")[0], format="utime") time_interval_sec = hdulist[5].data.field("TIME_INTV")[0] # label_unit = fits[5].data.field('DIM1_UNIT')[0] # labels = fits[5].data.field('DIM1_IDS') labels = [ "3 - 6 keV", "6 - 12 keV", "12 - 25 keV", "25 - 50 keV", "50 - 100 keV", "100 - 300 keV", "300 - 800 keV", "800 - 7000 keV", "7000 - 20000 keV", ] # The data stored in the fits file are "compressed" countrates stored as # one byte compressed_countrate = np.array(hdulist[6].data.field("countrate")) countrate = uncompress_countrate(compressed_countrate) dim = np.array(countrate[:, 0]).size time_array = parse_time(reference_time_ut) + TimeDelta( time_interval_sec * np.arange(dim) * u.second ) # TODO generate the labels for the dict automatically from labels data = {"time": time_array, "data": countrate, "labels": labels} return header, data
[docs]def uncompress_countrate(compressed_countrate): """ Convert the compressed count rate inside of observing summary file from a compressed byte to a true count rate. Parameters ---------- compressed_countrate : `byte` array A compressed count rate returned from an observing summary file. References ---------- `Hsi_obs_summ_decompress.pro <https://hesperia.gsfc.nasa.gov/ssw/hessi/idl/qlook_archive/hsi_obs_summ_decompress.pro>`_ """ # Ensure uncompressed counts are between 0 and 255 if (compressed_countrate.min() < 0) or (compressed_countrate.max() > 255): raise ValueError( f"Exepected uncompressed counts {compressed_countrate} to in range 0-255" ) # TODO Must be a better way than creating entire lookup table on each call ll = np.arange(0, 16, 1) lkup = np.zeros(256, dtype="int") _sum = 0 for i in range(0, 16): lkup[16 * i : 16 * (i + 1)] = ll * 2**i + _sum if i < 15: _sum = lkup[16 * (i + 1) - 1] + 2**i return lkup[compressed_countrate]
def hsi_linecolors(): """ Define discrete colors to use for RHESSI plots. Returns ------- `tuple` : A tuple of names of colours. References ---------- `hsi_linecolors.pro <https://hesperia.gsfc.nasa.gov/ssw/hessi/idl/gen/hsi_linecolors.pro>`__ """ return ("black", "magenta", "lime", "cyan", "y", "red", "blue", "orange", "olive") def _backproject( calibrated_event_list, detector=8, pixel_size=(1.0, 1.0), image_dim=(64, 64) ): """ Given a stacked calibrated event list fits file create a back projection image for an individual detectors. Parameters ---------- calibrated_event_list : `str` Filename of a RHESSI calibrated event list. detector : `int`, optional The detector number. pixel_size : `tuple`, optional A length 2 tuple with the size of the pixels in arcseconds. Defaults to ``(1, 1)``. image_dim : `tuple`, optional A length 2 tuple with the size of the output image in number of pixels. Defaults to ``(64, 64)``. Returns ------- `numpy.ndarray` A backprojection image. """ # info_parameters = fits[2] # detector_efficiency = info_parameters.data.field('cbe_det_eff$$REL') afits = sunpy.io.read_file(calibrated_event_list) fits_detector_index = detector + 2 detector_index = detector - 1 grid_angle = np.pi / 2.0 - grid_orientation[detector_index] harm_ang_pitch = grid_pitch[detector_index] / 1 phase_map_center = afits[fits_detector_index].data.field("phase_map_ctr") this_roll_angle = afits[fits_detector_index].data.field("roll_angle") modamp = afits[fits_detector_index].data.field("modamp") grid_transmission = afits[fits_detector_index].data.field("gridtran") count = afits[fits_detector_index].data.field("count") tempa = (np.arange(image_dim[0] * image_dim[1]) % image_dim[0]) - ( image_dim[0] - 1 ) / 2.0 tempb = ( tempa.reshape(image_dim[0], image_dim[1]) .transpose() .reshape(image_dim[0] * image_dim[1]) ) pixel = np.array(list(zip(tempa, tempb))) * pixel_size[0] phase_pixel = (2 * np.pi / harm_ang_pitch) * ( np.outer(pixel[:, 0], np.cos(this_roll_angle - grid_angle)) - np.outer(pixel[:, 1], np.sin(this_roll_angle - grid_angle)) ) + phase_map_center phase_modulation = np.cos(phase_pixel) gridmod = modamp * grid_transmission probability_of_transmission = gridmod * phase_modulation + grid_transmission bproj_image = np.inner(probability_of_transmission, count).reshape(image_dim) return bproj_image
[docs]@u.quantity_input def backprojection( calibrated_event_list, pixel_size: u.arcsec = (1.0, 1.0) * u.arcsec, image_dim: u.pix = (64, 64) * u.pix, ): """ Given a stacked calibrated event list fits file create a back projection image. .. warning:: The image will not be in the right orientation. Parameters ---------- calibrated_event_list : `str` Filename of a RHESSI calibrated event list. pixel_size : `tuple`, optional A length 2 tuple with the size of the pixels in arcsecond `~astropy.units.Quantity`. Defaults to ``(1, 1) * u.arcsec``. image_dim : `tuple`, optional A length 2 tuple with the size of the output image in number of pixel `~astropy.units.Quantity` Defaults to ``(64, 64) * u.pix``. Returns ------- `sunpy.map.sources.RHESSImap` A backprojection map. """ # import sunpy.map in here so that net and timeseries don't end up importing map import sunpy.map pixel_size = pixel_size.to(u.arcsec) image_dim = np.array(image_dim.to(u.pix).value, dtype=int) afits = sunpy.io.read_file(calibrated_event_list) info_parameters = afits[2] xyoffset = info_parameters.data.field("USED_XYOFFSET")[0] time_range = TimeRange( info_parameters.data.field("ABSOLUTE_TIME_RANGE")[0], format="utime" ) image = np.zeros(image_dim) # find out what detectors were used det_index_mask = afits[1].data.field("det_index_mask")[0] detector_list = (np.arange(9) + 1) * np.array(det_index_mask) for detector in detector_list: if detector > 0: image = image + _backproject( calibrated_event_list, detector=detector, pixel_size=pixel_size.value, image_dim=image_dim, ) dict_header = { "DATE-OBS": time_range.center.strftime("%Y-%m-%d %H:%M:%S"), "CDELT1": pixel_size[0], "NAXIS1": image_dim[0], "CRVAL1": xyoffset[0], "CRPIX1": image_dim[0] / 2 + 0.5, "CUNIT1": "arcsec", "CTYPE1": "HPLN-TAN", "CDELT2": pixel_size[1], "NAXIS2": image_dim[1], "CRVAL2": xyoffset[1], "CRPIX2": image_dim[0] / 2 + 0.5, "CUNIT2": "arcsec", "CTYPE2": "HPLT-TAN", "HGLT_OBS": 0, "HGLN_OBS": 0, "RSUN_OBS": sun.angular_radius(time_range.center).value, "RSUN_REF": sunpy.sun.constants.radius.value, "DSUN_OBS": sun.earth_distance(time_range.center).value * sunpy.sun.constants.au.value, } result_map = sunpy.map.Map(image, dict_header) return result_map
def _build_energy_bands(label, bands): """ Creates a list of strings with the correct formatting for axis labels. Parameters ---------- label: `str` The ``label`` to use as a basis. bands: `list` of `str` The bands to append to the ``label``. Returns ------- `list` of `str` Each string is an energy band and its unit. Example ------- >>> from sunkit_instruments.rhessi import _build_energy_bands >>> _build_energy_bands('Energy bands (keV)', ['3 - 6', '6 - 12', '12 - 25']) ['3 - 6 keV', '6 - 12 keV', '12 - 25 keV'] """ unit_pattern = re.compile(r"^.+\((?P<UNIT>\w+)\)$") matched = unit_pattern.match(label) if matched is None: raise ValueError( "Unable to find energy unit in '{}' " "using REGEX '{}'".format(label, unit_pattern.pattern) ) unit = matched.group("UNIT").strip() return [f"{band} {unit}" for band in bands]
[docs]def imagecube2map(rhessi_imagecube_file): """ Extracts single map images from a RHESSI flare image datacube. Currently assumes input to be 4D. This function is analogous to the `hsi_fits2map.pro` functionality available in SSW. Parameters ---------- rhessi_imagecube_file : `str` Path or URL to image datacube .fits Returns ------- `dict` of `sunpy.map.MapSequence` Each energy band has a list of maps where the index of the lists represent the time step """ # import sunpy.map in here so that net and timeseries don't end up importing map from sunpy.map import Map f = sunpy.io.read_file(rhessi_imagecube_file) header = f[0].header # make sure datacube is a RHESSI cube if header["INSTRUME"] != "RHESSI": raise ValueError(f"Expected a RHESSI datacube, got: {header['INSTRUME']}") # remove those (non-standard) headers to avoid user warnings (they are 0 anyway) del header["CROTACN1"] del header["CROTACN2"] del header["CROTA"] d_min = {} d_max = {} e_ax = f[1].data[0]["ENERGY_AXIS"].reshape((-1, 2)) # reshape energy axis to be 2D t_ax = f[1].data[0]["TIME_AXIS"].reshape((-1, 2)) # reshape time axis to be 2D data = f[0].data.reshape( tuple([1] * (4 - len(f[0].data.shape))) + f[0].data.shape ) # reshape data to be 4D for e in range(e_ax.shape[0]): d_min[e] = 1e10 d_max[e] = -1e10 for t in range(t_ax.shape[0]): d_min[e] = min(d_min[e], data[t][e].min()) d_max[e] = max(d_max[e], data[t][e].max()) maps = {} # result dictionary for e in range(e_ax.shape[0]): header["ENERGY_L"] = e_ax[e][0] header["ENERGY_H"] = e_ax[e][1] header["DATAMIN"] = d_min[e] header["DATAMAX"] = d_max[e] key = f"{int(header['ENERGY_L'])}-{int(header['ENERGY_H'])} keV" map_list = [] for t in range(t_ax.shape[0]): header["DATE_OBS"] = parse_time(t_ax[t][0], format="utime").to_value("isot") header["DATE_END"] = parse_time(t_ax[t][1], format="utime").to_value("isot") map_list.append(Map(data[t][e], header)) # extract image Map maps[key] = Map(map_list, sequence=True) return maps