"""
This file defines the SOARClient class which is used to access the Solar
Orbiter Archive (SOAR).
"""
import json
import pathlib
import re
from copy import copy
from json.decoder import JSONDecodeError
import astropy.table
import astropy.units as u
import requests
import sunpy.net.attrs as a
from sunpy import log
from sunpy.net.attr import and_
from sunpy.net.base_client import BaseClient, QueryResponseTable
from sunpy.time import parse_time
__all__ = ["SOARClient"]
[docs]
class SOARClient(BaseClient):
"""
Provides access to Solar Orbiter Archive (SOAR) which provides data for
Solar Orbiter.
References
----------
* `SOAR <https://soar.esac.esa.int/soar/>`__
"""
[docs]
def search(self, *query, **kwargs):
r"""
Query this client for a list of results.
Parameters
----------
*args: `tuple`
`sunpy.net.attrs` objects representing the query.
**kwargs: `dict`
Any extra keywords to refine the search.
Unused by this client.
Returns
-------
A ``QueryResponseTable`` instance containing the query result.
"""
from sunpy_soar._attrs import walker # NOQA: PLC0415
query = and_(*query)
queries = walker.create(query)
results = []
for query_parameters in queries:
if "provider='SOAR'" in query_parameters:
query_parameters.remove("provider='SOAR'")
results.append(self._do_search(query_parameters))
table = astropy.table.vstack(results)
qrt = QueryResponseTable(table, client=self)
qrt["Filesize"] = (qrt["Filesize"] * u.byte).to(u.Mbyte).round(3)
qrt.hide_keys = ["Data item ID", "Filename"]
return qrt
[docs]
@staticmethod
def add_join_to_query(query: list[str], data_table: str, instrument_table: str):
"""
Construct the WHERE, FROM, and SELECT parts of the ADQL query.
Parameters
----------
query : list[str]
List of query items.
data_table : str
Name of the data table.
instrument_table : str
Name of the instrument table.
Returns
-------
tuple[str, str, str]
WHERE, FROM, and SELECT parts of the query.
"""
final_query = ""
# Extract wavemin and wavemax individually
wavemin_pattern = re.compile(r"Wavemin='(\d+\.\d+)'")
wavemax_pattern = re.compile(r"Wavemax='(\d+\.\d+)'")
for current_parameter in query:
parameter = copy(current_parameter)
wavemin_match = wavemin_pattern.search(parameter)
wavemax_match = wavemax_pattern.search(parameter)
# If the wavemin and wavemax are same that means only one wavelength is given in query.
if wavemin_match and wavemax_match and float(wavemin_match.group(1)) == float(wavemax_match.group(1)):
# For PHI and SPICE, we can specify wavemin and wavemax in the query and get the results.
# For PHI we have wavelength data in both angstrom and nanometer without it being mentioned in the SOAR.
# For SPICE we get data in form of wavemin/wavemax columns, but only for the first spectral window.
# To make sure this data is not misleading to the user we do not return any values for PHI AND SPICE.
parameter = f"Wavelength='{wavemin_match.group(1)}'"
elif wavemin_match and wavemax_match:
parameter = f"Wavemin='{wavemin_match.group(1)}' AND h2.Wavemax='{wavemax_match.group(1)}'"
prefix = "h1." if not parameter.startswith("Detector") and not parameter.startswith("Wave") else "h2."
if parameter.startswith("begin_time"):
time_list = parameter.split(" AND ")
final_query += f"h1.{time_list[0]} AND h1.{time_list[1]} AND "
# As there are no dimensions in STIX, the dimension index need not be included in the query for STIX.
if "stx" not in instrument_table:
# To avoid duplicate rows in the output table, the dimension index is set to 1.
final_query += "h2.dimension_index='1' AND "
else:
final_query += f"{prefix}{parameter} AND "
where_part = final_query[:-5]
from_part = f"{data_table} AS h1"
select_part = (
"h1.instrument, h1.descriptor, h1.level, h1.begin_time, h1.end_time, "
"h1.data_item_id, h1.filesize, h1.filename, h1.soop_name"
)
if instrument_table:
from_part += f" JOIN {instrument_table} AS h2 USING (data_item_oid)"
select_part += ", h2.detector, h2.wavelength, h2.dimension_index"
return where_part, from_part, select_part
@staticmethod
def _construct_payload(query):
"""
Construct search payload.
Parameters
----------
query : list[str]
List of query items.
Returns
-------
dict
Payload dictionary to be sent with the query.
"""
# Default data table
data_table = "v_sc_data_item"
instrument_table = None
query_method = "doQuery"
# Mapping is established between the SOAR instrument names and its corresponding SOAR instrument table alias.
instrument_mapping = {
"SOLOHI": "SHI",
"EUI": "EUI",
"STIX": "STX",
"SPICE": "SPI",
"PHI": "PHI",
"METIS": "MET",
}
instrument_name = None
distance_parameter = []
non_distance_parameters = []
query_method = "doQuery"
instrument_name = None
for q in query:
if "DISTANCE" in str(q):
distance_parameter.append(q)
else:
non_distance_parameters.append(q)
if q.startswith("instrument") or (q.startswith("descriptor") and not instrument_name):
instrument_name = q.split("=")[1][1:-1].split("-")[0].upper()
elif q.startswith("level") and q.split("=")[1][1:3] == "LL":
data_table = "v_ll_data_item"
query = non_distance_parameters + distance_parameter
if distance_parameter:
query_method = "doQueryFilteredByDistance"
if instrument_name:
if instrument_name in instrument_mapping:
instrument_name = instrument_mapping[instrument_name]
instrument_table = f"v_{instrument_name.lower()}_sc_fits"
if data_table == "v_ll_data_item" and instrument_table:
instrument_table = instrument_table.replace("_sc_", "_ll_")
# Need to establish join for remote sensing instruments as they have instrument tables in SOAR.
if instrument_name in ["EUI", "MET", "SPI", "PHI", "SHI"]:
where_part, from_part, select_part = SOARClient.add_join_to_query(query, data_table, instrument_table)
else:
from_part = data_table
select_part = "*"
where_part = " AND ".join(query)
adql_query = {"SELECT": select_part, "FROM": from_part, "WHERE": where_part}
adql_query_str = " ".join([f"{key} {value}" for key, value in adql_query.items()])
if query_method == "doQueryFilteredByDistance":
adql_query_str = adql_query_str.replace(" AND h1.DISTANCE", "&DISTANCE").replace(
" AND DISTANCE", "&DISTANCE"
)
return {"REQUEST": query_method, "LANG": "ADQL", "FORMAT": "json", "QUERY": adql_query_str}
@staticmethod
def _do_search(query):
"""
Query the SOAR server with a single query.
Parameters
----------
query : list[str]
List of query items.
Returns
-------
astropy.table.QTable
Query results.
"""
tap_endpoint = "http://soar.esac.esa.int/soar-sl-tap/tap"
payload = SOARClient._construct_payload(query)
# Need to force requests to not form-encode the parameters
payload = "&".join([f"{key}={val}" for key, val in payload.items()])
# Get request info
r = requests.get(f"{tap_endpoint}/sync", params=payload, timeout=60)
log.debug(f"Sent query: {r.url}")
r.raise_for_status()
try:
response_json = r.json()
except JSONDecodeError as err:
msg = "The SOAR server returned an invalid JSON response. It may be down or not functioning correctly."
raise RuntimeError(msg) from err
names = [m["name"] for m in response_json["metadata"]]
info = {name: [] for name in names}
for entry in response_json["data"]:
for i, name in enumerate(names):
info[name].append(entry[i])
if len(info["begin_time"]):
info["begin_time"] = parse_time(info["begin_time"]).iso
info["end_time"] = parse_time(info["end_time"]).iso
result_table = astropy.table.QTable(
{
"Instrument": info["instrument"],
"Data product": info["descriptor"],
"Level": info["level"],
"Start time": info["begin_time"],
"End time": info["end_time"],
"Data item ID": info["data_item_id"],
"Filename": info["filename"],
"Filesize": info["filesize"],
"SOOP Name": info["soop_name"],
},
)
if "detector" in info:
result_table["Detector"] = info["detector"]
if "sensor" in info:
result_table["Sensor"] = info["sensor"]
if "wavelength" in info:
result_table["Wavelength"] = info["wavelength"]
result_table.sort("Start time")
return result_table
[docs]
def fetch(self, query_results, *, path, downloader, **kwargs) -> None:
"""
Queue a set of results to be downloaded.
`sunpy.net.base_client.BaseClient` does the actual downloading, so we
just have to queue up the ``downloader``.
Parameters
----------
query_results : sunpy.net.fido_factory.UnifiedResponse
Results from a Fido search.
path : str
Path to download files to. Must be a format string with a ``file``
field for the filename.
downloader : parfive.Downloader
Downloader instance used to download data.
kwargs :
Keyword arguments aren't used by this client.
"""
base_url = "http://soar.esac.esa.int/soar-sl-tap/data?" "retrieval_type=LAST_PRODUCT"
for row in query_results:
url = base_url
if row["Level"].startswith("LL"):
url += "&product_type=LOW_LATENCY"
else:
url += "&product_type=SCIENCE"
data_id = row["Data item ID"]
url += f"&data_item_id={data_id}"
filepath = str(path).format(file=row["Filename"], **row.response_block_map)
log.debug(f"Queuing URL: {url}")
downloader.enqueue_file(url, filename=filepath)
@classmethod
def _can_handle_query(cls, *query) -> bool:
"""
Check if this client can handle a given Fido query. Checks to see if a
SOAR instrument or product is provided in the query.
Returns
-------
bool
True if this client can handle the given query.
"""
from sunpy_soar.attrs import (SOOP, Distance, Product, # NOQA: PLC0415
Sensor)
required = {Distance} if any(isinstance(q, Distance) for q in query) else {a.Time}
optional = {
a.Instrument,
a.Detector,
Sensor,
a.Wavelength,
a.Level,
a.Provider,
Product,
SOOP,
Distance,
a.Time,
}
if not cls.check_attr_types_in_query(query, required, optional):
return False
# check to make sure the instrument attr passed is one provided by the SOAR.
# also check to make sure that the provider passed is the SOAR for which this client can handle.
instr = [i[0].lower() for i in cls.register_values()[a.Instrument]]
for x in query:
if isinstance(x, a.Instrument) and str(x.value).lower() not in instr:
return False
if isinstance(x, a.Provider) and str(x.value).lower() != "soar":
return False
return True
@classmethod
def _attrs_module(cls):
# Register SOAR specific attributes with Fido
return "soar", "sunpy_soar.attrs"
[docs]
@classmethod
def register_values(cls):
"""
Register the SOAR specific attributes with Fido.
Returns
-------
dict
The dictionary containing the values formed into attributes.
"""
return cls.load_dataset_values()
[docs]
@staticmethod
def load_dataset_values():
"""
Loads the net attribute values from the JSON file.
Returns
-------
dict
The dictionary containing the values formed into attributes.
"""
from sunpy_soar.attrs import SOOP, Product, Sensor # NOQA: PLC0415
# Instrument attrs
attrs_path = pathlib.Path(__file__).parent / "data" / "attrs.json"
with attrs_path.open() as attrs_file:
all_datasets = json.load(attrs_file)
# Convert from dict to list of tuples
all_datasets = list(all_datasets.items())
# Instrument attrs
instr_path = pathlib.Path(__file__).parent / "data" / "instrument_attrs.json"
with instr_path.open() as instr_attrs_file:
all_instr = json.load(instr_attrs_file)
all_instr = list(all_instr.items())
# Sensor attrs
sensor_path = pathlib.Path(__file__).parent / "data" / "sensor_attrs.json"
with sensor_path.open() as sensor_attrs_file:
all_sensors = json.load(sensor_attrs_file)
all_sensors = list(all_sensors.items())
soop_path = pathlib.Path(__file__).parent / "data" / "soop_attrs.json"
with soop_path.open() as soop_path_file:
all_soops = json.load(soop_path_file)
all_soops = list(all_soops.items())
return {
Product: all_datasets,
a.Instrument: all_instr,
Sensor: all_sensors,
SOOP: all_soops,
a.Provider: [("SOAR", "Solar Orbiter Archive.")],
}