import gzip
import struct
import pathlib
import warnings
import functools
from pathlib import Path
from collections import OrderedDict
from urllib.request import Request
import cdflib
import numpy as np
import pandas as pd
from scipy.io import readsav
import astropy.units as u
from astropy.io import fits
from astropy.io.fits import Header
from astropy.time import Time
from sunpy import log
from sunpy.data import cache
from sunpy.net import attrs as a
from sunpy.time import parse_time
from sunpy.util.datatype_factory_base import (
BasicRegistrationFactory,
MultipleMatchError,
NoMatchError,
ValidationFunctionError,
)
from sunpy.util.exceptions import SunpyUserWarning, warn_user
from sunpy.util.io import is_url, parse_path, possibly_a_path
from sunpy.util.metadata import MetaDict
from sunpy.util.util import expand_list
from radiospectra.exceptions import NoSpectrogramInFileError, SpectraMetaValidationError
from radiospectra.spectrogram.spectrogrambase import GenericSpectrogram
from radiospectra.utils import subband_to_freq
SUPPORTED_ARRAY_TYPES = (np.ndarray,)
try:
import dask.array
SUPPORTED_ARRAY_TYPES += (dask.array.Array,)
except ImportError:
pass
__all__ = ["SpectrogramFactory", "Spectrogram"]
[docs]
class SpectrogramFactory(BasicRegistrationFactory):
"""
A factory for generating spectrograms.
Parameters
----------
\\*inputs
`str` or `pathlib.Path` to the file.
Returns
-------
`radiospectra.spectrogram.Spectrogram`
The spectrogram for the give file
"""
def _validate_meta(self, meta):
"""
Validate a meta argument.
"""
if isinstance(meta, Header):
return True
elif isinstance(meta, dict):
return True
else:
return False
def _parse_args(self, *args, silence_errors=False, **kwargs):
"""
Parses an args list into data-header pairs.
args can contain any mixture of the following
entries:
* tuples of data,header
* data, header not in a tuple
* data, wcs object in a tuple
* data, wcs object not in a tuple
* filename, as a str or pathlib.Path, which will be read
* directory, as a str or pathlib.Path, from which all files will be read
* glob, from which all files will be read
* url, which will be downloaded and read
* lists containing any of the above.
Example
-------
self._parse_args(data, header,
(data, header),
['file1', 'file2', 'file3'],
'file4',
'directory1',
'*.fits')
"""
# Account for nested lists of items
args = expand_list(args)
# Sanitize the input so that each 'type' of input corresponds to a different
# class, so single dispatch can be used later
nargs = len(args)
i = 0
while i < nargs:
arg = args[i]
if isinstance(arg, SUPPORTED_ARRAY_TYPES):
# The next two items are data and a header
data = args.pop(i)
header = args.pop(i)
args.insert(i, (data, header))
nargs -= 1
elif isinstance(arg, str) and is_url(arg):
# Replace URL string with a Request object to dispatch on later
args[i] = Request(arg)
elif possibly_a_path(arg):
# Replace path strings with Path objects
args[i] = pathlib.Path(arg)
i += 1
# Parse the arguments
# Note that this list can also contain GenericMaps if they are directly given to the factory
data_header_pairs = []
for arg in args:
try:
data_header_pairs += self._parse_arg(arg, **kwargs)
except NoSpectrogramInFileError as e:
if not silence_errors:
raise
warn_user(f"One of the arguments failed to parse with error: {e}")
return data_header_pairs
@functools.singledispatchmethod
def _parse_arg(self, arg, **kwargs):
"""
Take a factory input and parse into (data, header) pairs.
Must return a list, even if only one pair is returned.
"""
raise ValueError(f"Invalid input: {arg}")
@_parse_arg.register(tuple)
def _parse_tuple(self, arg, **kwargs):
# Data-header
data, header = arg
pair = data, header
if self._validate_meta(header):
pair = (data, OrderedDict(header))
return [pair]
@_parse_arg.register(GenericSpectrogram)
def _parse_map(self, arg, **kwargs):
return [arg]
@_parse_arg.register(Request)
def _parse_url(self, arg, **kwargs):
url = arg.full_url
path = str(cache.download(url).absolute())
pairs = self._read_file(path, **kwargs)
return pairs
@_parse_arg.register(pathlib.Path)
def _parse_path(self, arg, **kwargs):
return parse_path(arg, self._read_file, **kwargs)
[docs]
def __call__(self, *args, silence_errors=False, **kwargs):
"""
Method for running the factory.
Takes arbitrary arguments and keyword arguments and passes
them to a sequence of pre-registered types to determine which is the correct spectrogram-
type to build. Arguments args and kwargs are passed through to the validation function and
to the constructor for the final type. For spectrogram types, validation function must take
a data-header pair as an argument.
Parameters
----------
silence_errors : `bool`, optional
If set, ignore data-header pairs which cause an exception.
Default is ``False``.
Notes
-----
Extra keyword arguments are passed through to `sunpy.io.read_file` such
as `memmap` for FITS files.
"""
data_header_pairs = self._parse_args(*args, silence_errors=silence_errors, **kwargs)
new_maps = list()
# Loop over each registered type and check to see if WidgetType
# matches the arguments. If it does, use that type.
for pair in data_header_pairs:
if isinstance(pair, GenericSpectrogram):
new_maps.append(pair)
continue
data, header = pair
meta = MetaDict(header)
try:
new_map = self._check_registered_widgets(data, meta, **kwargs)
new_maps.append(new_map)
except (NoMatchError, MultipleMatchError, ValidationFunctionError, SpectraMetaValidationError) as e:
if not silence_errors:
raise
warnings.warn(f"One of the data, header pairs failed to validate with: {e}", SunpyUserWarning)
if not len(new_maps):
raise RuntimeError("No maps loaded")
if len(new_maps) == 1:
return new_maps[0]
return new_maps
def _check_registered_widgets(self, data, meta, **kwargs):
candidate_widget_types = list()
for key in self.registry:
# Call the registered validation function for each registered class
if self.registry[key](data, meta, **kwargs):
candidate_widget_types.append(key)
n_matches = len(candidate_widget_types)
if n_matches == 0:
if self.default_widget_type is None:
raise NoMatchError("No types match specified arguments and no default is set.")
else:
candidate_widget_types = [self.default_widget_type]
elif n_matches > 1:
raise MultipleMatchError(
"Too many candidate types identified "
f"({candidate_widget_types}). "
"Specify enough keywords to guarantee unique type "
"identification."
)
# Only one is found
WidgetType = candidate_widget_types[0]
return WidgetType(data, meta, **kwargs)
def _read_file(self, file, **kwargs):
file = Path(file)
extensions = file.suffixes
first_extension = extensions[0].lower()
if first_extension == ".dat":
return self._read_dat(file)
elif first_extension in (".r1", ".r2"):
return [self._read_idl_sav(file, instrument="waves")]
elif first_extension == ".cdf":
cdf = self._read_cdf(file)
if isinstance(cdf, list):
return cdf
return [cdf]
elif first_extension == ".srs":
return [self._read_srs(file)]
elif first_extension in (".fits", ".fit", ".fts", "fit.gz"):
return [self._read_fits(file)]
else:
raise ValueError(f"Extension {extensions[0]} not supported.")
@staticmethod
def _read_dat(file):
if "swaves" in file.name:
name, prod, date, spacecraft, receiver = file.stem.split("_")
# frequency range
freqs = np.genfromtxt(file, max_rows=1) * u.kHz
# bg which is already subtracted from data
bg = np.genfromtxt(file, skip_header=1, max_rows=1)
# data
data = np.genfromtxt(file, skip_header=2)
times = data[:, 0] * u.min
data = data[:, 1:].T
meta = {
"instrument": name,
"observatory": f"STEREO {spacecraft.upper()}",
"product": prod,
"start_time": Time.strptime(date, "%Y%m%d"),
"wavelength": a.Wavelength(freqs[0], freqs[-1]),
"detector": receiver,
"freqs": freqs,
"background": bg,
}
meta["times"] = meta["start_time"] + times
meta["end_time"] = meta["start_time"] + times[-1]
return data, meta
elif "bst" in file.name:
subbands = (np.arange(54, 454, 2), np.arange(54, 454, 2), np.arange(54, 230, 2))
num_subbands = 488
data = np.fromfile(file)
polarisation = file.stem[-1]
num_times = data.shape[0] / num_subbands
if not num_times.is_integer():
log.warning("BST file seems incomplete dropping incomplete frequencies")
num_times = np.floor(num_times).astype(int)
truncate = num_times * num_subbands
data = data[:truncate]
data = data.reshape(-1, num_subbands).T # (Freq x Time).T = (Time x Freq)
dt = np.arange(num_times) * 1 * u.s
start_time = Time.strptime(file.name.split("_bst")[0], "%Y%m%d_%H%M%S")
times = start_time + dt
obs_mode = (3, 5, 7)
freqs = [subband_to_freq(sb, mode) for sb, mode in zip(subbands, obs_mode)]
# 1st 200 sbs mode 3, next 200 sbs mode 5, last 88 sbs mode 7
spec = {0: data[:200, :], 1: data[200:400, :], 2: data[400:, :]}
data_header_pairs = []
for i in range(3):
meta = {
"instrument": "ILOFAR",
"observatory": "Birr (IE613)",
"start_time": times[0],
"mode": obs_mode[i],
"wavelength": a.Wavelength(freqs[i][0], freqs[i][-1]),
"freqs": freqs[i],
"times": times,
"end_time": times[-1],
"detector": "ILOFAR",
"polarisation": polarisation,
}
data_header_pairs.append((spec[i], meta))
return data_header_pairs
else:
raise ValueError(f"File {file} not supported.")
@staticmethod
def _read_srs(file):
with file.open("rb") as buff:
data = buff.read()
if file.suffixes[-1] == ".gz":
data = gzip.decompress(data)
# Data is store as a series of records made of different numbers of bytes
# General header information
# 1 Year (last 2 digits) Byte integer (unsigned)
# 2 Month number (1 to 12) "
# 3 Day (1 to 31) "
# 4 Hour (0 to 23 UT) "
# 5 Minute (0 to 59) "
# 6 Second at start of scan (0 to 59) "
# 7 Site Number (0 to 255) "
# 8 Number of bands in the record (2) "
#
# Band 1 (A-band) header information
# 9,10 Start Frequency (MHz) Word integer (16 bits)
# 11,12 End Frequency (MHz) "
# 13,14 Number of bytes in data record (401)"
# 15 Analyser reference level Byte integer
# 16 Analyser attenuation (dB) "
#
# Band 2 (B-band) header information
# 17-24 As for band 1
#
# Spectrum Analyser data
# 25-425 401 data bytes for band 1 (A-band)
# 426-826 401 data bytes for band 2 (B-band)
record_struc = struct.Struct("B" * 8 + "H" * 3 + "B" * 2 + "H" * 3 + "B" * 2 + "B" * 401 + "B" * 401)
records = record_struc.iter_unpack(data)
# Map of numeric records to locations
site_map = {1: "Palehua", 2: "Holloman", 3: "Learmonth", 4: "San Vito"}
df = pd.DataFrame([(*r[:18], np.array(r[18:419]), np.array(r[419:820])) for r in records])
df.columns = [
"year",
"month",
"day",
"hour",
"minute",
"second",
"site",
" num_bands",
"start_freq1",
"end_freq1",
"num_bytes1",
"analyser_ref1",
"analyser_atten1",
"start_freq2",
"end_freq2",
"num_bytes2",
"analyser_ref2",
"analyser_atten2",
"spec1",
"spec2",
]
# Hack to make to_datetime work - earliest dates seem to be 2000 and won't be
# around in 3000!
df["year"] = df["year"] + 2000
df["time"] = pd.to_datetime(df[["year", "month", "day", "hour", "minute", "second"]])
# Equations taken from document
n = np.arange(1, 402)
freq_a = (25 + 50 * (n - 1) / 400) * u.MHz
freq_b = (75 + 105 * (n - 1) / 400) * u.MHz
freqs = np.hstack([freq_a, freq_b])
data = np.hstack([np.vstack(df[name].to_numpy()) for name in ["spec1", "spec2"]]).T
times = Time(df["time"])
meta = {
"instrument": "RSTN",
"observatory": site_map[df["site"][0]],
"start_time": times[0],
"end_time": times[-1],
"detector": "RSTN",
"wavelength": a.Wavelength(freqs[0], freqs[-1]),
"freqs": freqs,
"times": times,
}
return data, meta
@staticmethod
def _read_cdf(file):
cdf = cdflib.CDF(file)
cdf_globals = cdf.globalattsget()
if (
cdf_globals.get("Project", "")[0] == "PSP"
and cdf_globals.get("Source_name")[0] == "PSP_FLD>Parker Solar Probe FIELDS"
and "Radio Frequency Spectrometer" in cdf_globals.get("Descriptor")[0]
):
short, _long = cdf_globals["Descriptor"][0].split(">")
detector = short[4:].lower()
times, data, freqs = (
cdf.varget(name)
for name in [
f"epoch_{detector}_auto_averages_ch0_V1V2",
f"psp_fld_l2_rfs_{detector}_auto_averages_ch0_V1V2",
f"frequency_{detector}_auto_averages_ch0_V1V2",
]
)
times = Time("J2000.0", scale="tt") + (times << u.ns)
freqs = freqs[0, :] << u.Hz
data = data.T << u.Unit("Volt**2/Hz")
meta = {
"cdf_globals": cdf_globals,
"detector": detector,
"instrument": "FIELDS/RFS",
"observatory": "PSP",
"start_time": times[0],
"end_time": times[-1],
"wavelength": a.Wavelength(freqs.min(), freqs.max()),
"times": times,
"freqs": freqs,
}
return data, meta
elif "SOLO" in cdf_globals.get("Project", "")[0]:
if "RPW-HFR-SURV" not in cdf_globals.get("Descriptor", "")[0]:
raise ValueError(
f"Currently radiospectra supports Level 2 HFR survey data the file"
f'{file.name} is {cdf_globals.get("Descriptor", "")}'
)
# FREQUENCY_BAND_LABELS = ["HF1", "HF2"]
# SURVEY_MODE_LABELS = ["SURVEY_NORMAL", "SURVEY_BURST"]
# CHANNEL_LABELS = ["1", "2"]
SENSOR_MAPPING = {
1: "V1",
2: "V2",
3: "V3",
4: "V1-V2",
5: "V2-V3",
6: "V3-V1",
7: "B_MF",
9: "HF_V1-V2",
10: "HF_V2-V3",
11: "HF_V3-V1",
}
# Extract variables
all_times = Time("J2000.0") + cdf.varget("EPOCH") * u.Unit(cdf.varattsget("EPOCH")["UNITS"])
all_freqs = cdf.varget("FREQUENCY") << u.Unit(cdf.varattsget("FREQUENCY")["UNITS"])
sweep_start_indices = np.asarray(np.diff(cdf.varget("SWEEP_NUM")) != 0).nonzero()
sweep_start_indices = np.insert((sweep_start_indices[0] + 1), 0, 0)
times = all_times[sweep_start_indices]
sensor = cdf.varget("SENSOR_CONFIG")
np.unique(cdf.varget("FREQUENCY"))
band = cdf.varget("HFR_BAND")
u.Unit(cdf.varattsget("AGC1").get("UNIT", "V^2/Hz"))
agc1 = cdf.varget("AGC1")
agc2 = cdf.varget("AGC2")
# Define number of records
n_rec = band.shape[0]
# Get Epoch times of first sample of each sweep in the file
sweep_times = times
nt = len(sweep_times)
# Get complete list of HFR frequency values
hfr_frequency = 375 + 50 * np.arange(321) # This is a guess something between 320 and 324
nf = len(hfr_frequency)
# Initialize output 2D array containing voltage spectral power values in V^2/Hz
# Dims = (channels[2], time of the first sweep sample[len(time)], frequency[192])
specs = np.empty((2, nt, nf))
# Fill 2D array with NaN for HRF frequencies not actually measured in the file
specs[:] = np.nan
# Get list of first index of sweeps
isweep = sweep_start_indices[:]
# Get number of sweeps
n_sweeps = len(isweep)
# Insert an element in the end of the isweep list
# containing the end of the latest sweep
# (required for the loop below, in order to have
# a start/end index range for each sweep)
isweep = np.insert(isweep, n_sweeps, n_rec)
# Initialize sensor_config
sensor_config = np.zeros((2, nt), dtype=object)
tm = []
# Perform a loop on each sweep
for i in range(n_sweeps):
# Get first and last index of the sweep
i0 = isweep[i]
i1 = isweep[i + 1]
ts = all_times[i0]
te = all_times[i1 - 1]
tt = (te - ts) * 0.5 + ts
tm.append(tt)
# Get indices of the actual frequency channels in the frequency vector
freq_indices = ((all_freqs[i0:i1].value - 375) / 50).astype(int)
# fill output 2D array
specs[0, i, freq_indices] = agc1[i0:i1]
specs[1, i, freq_indices] = agc2[i0:i1]
# Fill sensor config
sensor_config[0, i] = SENSOR_MAPPING[sensor[i0, 0]]
sensor_config[1, i] = SENSOR_MAPPING[sensor[i0, 1]]
# Define hfr bands
hfc = np.array(["HF1", "HF2"])
hfc[band[:100] - 1]
hfr_frequency = hfr_frequency << u.kHz
res = []
if np.any(agc1):
meta1 = {
"cdf_globals": cdf_globals,
"detector": "RPW-AGC1",
"instrument": "RPW",
"observatory": "SOLO",
"start_time": times[0],
"end_time": times[-1],
"wavelength": a.Wavelength(hfr_frequency.min(), hfr_frequency.max()),
"times": times,
"freqs": hfr_frequency,
}
res.append((specs[0].T, meta1))
if np.any(agc2):
meta2 = {
"cdf_globals": cdf_globals,
"detector": "RPW-AGC2",
"instrument": "RPW",
"observatory": "SOLO",
"start_time": times[0],
"end_time": times[-1],
"wavelength": a.Wavelength(hfr_frequency.min(), hfr_frequency.max()),
"times": times,
"freqs": hfr_frequency,
}
res.append((specs[1].T, meta2))
return res
@staticmethod
def _read_fits(file):
hd_pairs = fits.open(file)
if "e-CALLISTO" in hd_pairs[0].header.get("CONTENT", ""):
data = hd_pairs[0].data
times = hd_pairs[1].data["TIME"].flatten() * u.s
freqs = hd_pairs[1].data["FREQUENCY"].flatten() * u.MHz
start_time = parse_time(hd_pairs[0].header["DATE-OBS"] + " " + hd_pairs[0].header["TIME-OBS"])
try:
end_time = parse_time(hd_pairs[0].header["DATE-END"] + " " + hd_pairs[0].header["TIME-END"])
except ValueError:
# See https://github.com/sunpy/radiospectra/issues/74
time_comps = hd_pairs[0].header["TIME-END"].split(":")
time_comps[0] = "00"
fixed_time = ":".join(time_comps)
date_offset = parse_time(hd_pairs[0].header["DATE-END"] + " " + fixed_time)
end_time = date_offset + 1 * u.day
times = start_time + times
meta = {
"fits_meta": hd_pairs[0].header,
"detector": "e-CALLISTO",
"instrument": "e-CALLISTO",
"observatory": hd_pairs[0].header["INSTRUME"],
"start_time": start_time,
"end_time": end_time,
"wavelength": a.Wavelength(freqs.min(), freqs.max()),
"times": times,
"freqs": freqs,
}
return data, meta
elif hd_pairs[0].header.get("TELESCOP", "") == "EOVSA":
times = Time(hd_pairs[2].data["mjd"] + hd_pairs[2].data["time"] / 1000.0 / 86400.0, format="mjd")
freqs = hd_pairs[1].data["sfreq"] * u.GHz
data = hd_pairs[0].data
start_time = parse_time(hd_pairs[0].header["DATE_OBS"])
end_time = parse_time(hd_pairs[0].header["DATE_END"])
meta = {
"fits_meta": hd_pairs[0].header,
"detector": "EOVSA",
"instrument": "EOVSA",
"observatory": "Owens Valley",
"start_time": start_time,
"end_time": end_time,
"wavelength": a.Wavelength(freqs.min(), freqs.max()),
"times": times,
"freqs": freqs,
}
return data, meta
# Semi standard - spec in primary and time and freq in 1st extension
try:
data = hd_pairs[0].data
times = hd_pairs[1].data["TIME"].flatten() * u.s
freqs = hd_pairs[1].data["FREQUENCY"].flatten() * u.MHz
start_time = parse_time(hd_pairs[0].header["DATE-OBS"] + " " + hd_pairs[0].header["TIME-OBS"])
end_time = parse_time(hd_pairs[0].header["DATE-END"] + " " + hd_pairs[0].header["TIME-END"])
times = start_time + times
meta = {
"fits_meta": hd_pairs[0].header,
"start_time": start_time,
"end_time": end_time,
"wavelength": a.Wavelength(freqs.min(), freqs.max()),
"times": times,
"freqs": freqs,
"instrument": hd_pairs[0].header.get("INSTRUME", ""),
"observatory": hd_pairs[0].header.get("INSTRUME", ""),
"detector": hd_pairs[0].header.get("DETECTOR", ""),
}
if "e-CALLISTO" in hd_pairs[0].header["CONTENT"]:
meta["detector"] = "e-CALLISTO"
meta["instrument"] = "e-CALLISTO"
return data, meta
except Exception as e:
raise ValueError(f"Could not load fits file: {file} into Spectrogram.") from e
@staticmethod
def _read_idl_sav(file, instrument=None):
data = readsav(file)
if instrument == "waves":
# See https://solar-radio.gsfc.nasa.gov/wind/one_minute_doc.html
data_array = data["arrayb"]
# frequency range
if file.suffix == ".R1":
freqs = np.linspace(20, 1040, 256) * u.kHz
receiver = "RAD1"
elif file.suffix == ".R2":
freqs = np.linspace(1.075, 13.825, 256) * u.MHz
receiver = "RAD2"
else:
raise ValueError(f"Unknown WIND/WAVES file type: {file.suffix}")
# bg which is already subtracted from data ?
bg = data_array[:, -1]
data = data_array[:, :-1]
start_time = Time.strptime(file.stem, "%Y%m%d")
end_time = start_time + 86399 * u.s
times = start_time + (np.arange(1440) * 60 + 30) * u.s
meta = {
"instrument": "WAVES",
"observatory": "WIND",
"start_time": start_time,
"end_time": end_time,
"wavelength": a.Wavelength(freqs[0], freqs[-1]),
"detector": receiver,
"freqs": freqs,
"times": times,
"background": bg,
}
return data, meta
else:
raise ValueError(f"Unrecognized IDL .save file: {file}")
Spectrogram = SpectrogramFactory(registry=GenericSpectrogram._registry, default_widget_type=GenericSpectrogram)