Source code for sunpy.net.vso.table_response
"""
Classes and helper functions for VSO responses.
"""
from collections import defaultdict
from collections.abc import Mapping
import numpy as np
from zeep.helpers import serialize_object
import astropy.units as u
from astropy.table import TableAttribute
from astropy.time import Time
from sunpy.net.base_client import QueryResponseTable
from sunpy.time import parse_time
__all__ = ['VSOQueryResponseTable']
def iter_sort_response(response):
"""
Sorts the VSO query results by their start time.
Parameters
----------
response : `zeep.objects.QueryResponse`
A SOAP Object of a VSO query result
Returns
-------
`list`
Sorted record items w.r.t. their start time.
"""
has_time_recs = list()
has_notime_recs = list()
for prov_item in response.provideritem:
if not hasattr(prov_item, 'record') or not prov_item.record:
continue
if not hasattr(prov_item.record, 'recorditem') or not prov_item.record.recorditem:
continue
rec_item = prov_item.record.recorditem
for rec in rec_item:
if hasattr(rec, 'time') and hasattr(rec.time, 'start') and rec.time.start is not None:
has_time_recs.append(rec)
else:
has_notime_recs.append(rec)
has_time_recs = sorted(has_time_recs, key=lambda x: x.time.start)
all_recs = has_time_recs + has_notime_recs
return all_recs
[docs]
class VSOQueryResponseTable(QueryResponseTable):
hide_keys = ['fileid', 'fileurl', 'Info Required']
errors = TableAttribute(default=[])
size_column = 'Size'
[docs]
@classmethod
def from_zeep_response(cls, response, *, client, _sort=True):
"""
Construct a table response from the zeep response.
"""
# _sort is a hack to be able to convert from a legacy QueryResponse to
# a table response.
if _sort:
records = iter_sort_response(response)
else:
records = response
data = []
for record in records:
row = defaultdict(lambda: None)
for key, value in serialize_object(record).items():
if not isinstance(value, Mapping):
if key == "size":
# size is in bytes with a very high degree of precision.
value = (value * u.Kibyte).to(u.Mibyte).round(5)
key = key.capitalize() if key not in cls.hide_keys else key
row[key] = value
else:
if key == "wave":
# Some records in the VSO have 'kev' which astropy
# doesn't recognize as a unit, so fix it.
waveunit = value['waveunit']
waveunit = 'keV' if waveunit == 'kev' else waveunit
row["Wavelength"] = None
if value['wavemin'] is not None and value['wavemax'] is not None:
row["Wavelength"] = u.Quantity(
[float(value['wavemin']), float(value['wavemax'])],
unit=waveunit)
row["Wavetype"] = value['wavetype']
continue
for subkey, subvalue in value.items():
if key == "time" and subvalue is not None:
key_template = f"{subkey.capitalize()} {key.capitalize()}"
else:
key_template = f"{key.capitalize()} {subkey.capitalize()}"
row[key_template] = subvalue
data.append(row)
data = cls(data, client=client)
# Parse times
for col in data.colnames:
if col.endswith('Time'):
try:
# Try to use a vectorised call to parse_time
data[col] = parse_time(data[col])
except Exception:
# If that fails, parse dates one by one. This is needed if
# VSO returns a variety of different date format strings
times = []
mask = []
for i, t in enumerate(data[col]):
if t is not None:
times.append(parse_time(t))
else:
# Create a dummy time and mask it later
times.append(Time(val=0, format='mjd'))
mask.append(i)
data[col] = Time(times)
data[col][mask] = np.ma.masked
data[col].format = 'iso'
# Reorder the columns to put the most useful ones first.
return data._reorder_columns(['Start Time', 'End Time', 'Source',
'Instrument', 'Type', 'Wavelength'],
remove_empty=True)
[docs]
def add_error(self, exception):
self.errors.append(exception)