"""
This module provides the `Fido
<sunpy.net.fido_factory.UnifiedDownloaderFactory>` instance of
`sunpy.net.fido_factory.UnifiedDownloaderFactory` it also provides the
`~sunpy.net.fido_factory.UnifiedResponse` class which
`Fido.search <sunpy.net.fido_factory.UnifiedDownloaderFactory.search>` returns and the
`parfive.Results` class that is returned by
`Fido.fetch <sunpy.net.fido_factory.UnifiedDownloaderFactory.fetch>`.
"""
import os
import re
from pathlib import Path
from textwrap import dedent
from collections.abc import Sequence
import numpy as np
import parfive
from packaging.version import Version
from astropy.table import Table
from sunpy import config
from sunpy.net import attr, vso
from sunpy.net.base_client import BaseClient, QueryResponseColumn, QueryResponseRow, QueryResponseTable
from sunpy.util.datatype_factory_base import BasicRegistrationFactory, NoMatchError
from sunpy.util.parfive_helpers import Downloader, Results
from sunpy.util.util import get_width
__all__ = ['Fido', 'UnifiedResponse', 'UnifiedDownloaderFactory']
parfive_version = Version(parfive.__version__)
[docs]
class UnifiedResponse(Sequence):
"""
The object used to store results from `~sunpy.net.fido_factory.UnifiedDownloaderFactory.search`.
The `~sunpy.net.Fido` object returns results from multiple different
clients. So it is always possible to sub-select these results, you can
index this object with two indices. The first index is the client index,
i.e. corresponding to the results from the `~sunpy.net.vso.VSOClient`. The
second index can be used to select records from the results returned from
that client, for instance if you only want every second result you could
index the second dimension with ``::2``.
"""
def __init__(self, *results):
"""
Parameters
----------
*results : `sunpy.net.base_client.QueryResponseTable`
One or more QueryResponse objects.
"""
self._list = []
self._numfile = 0
for result in results:
if isinstance(result, QueryResponseRow):
result = result.as_table()
if isinstance(result, QueryResponseColumn):
result = result.as_table()
if not isinstance(result, QueryResponseTable):
raise TypeError(
f"{type(result)} is not derived from sunpy.net.base_client.QueryResponseTable")
self._list.append(result)
self._numfile += len(result)
def __len__(self):
return len(self._list)
def _getitem_string(self, aslice):
ret = []
for res in self._list:
clientname = res.client.__class__.__name__
if aslice.lower() == clientname.lower().split('client')[0]:
ret.append(res)
if len(ret) == 1:
ret = ret[0]
elif len(ret) == 0:
raise IndexError(f"{aslice} is not a valid key, valid keys are: {','.join(self.keys())}")
return ret
def __getitem__(self, aslice):
"""
Support slicing the UnifiedResponse as a 2D object.
The first index is to the client and the second index is the records
returned from those clients.
"""
if isinstance(aslice, int | slice):
ret = self._list[aslice]
# using the client's name for indexing the responses.
elif isinstance(aslice, str):
ret = self._getitem_string(aslice)
# Make sure we only have a length two slice.
elif isinstance(aslice, tuple):
if len(aslice) > 2:
raise IndexError("UnifiedResponse objects can only "
"be sliced with one or two indices.")
# Indexing both client and records, but only for one client.
if isinstance(aslice[0], str):
intermediate = self._getitem_string(aslice[0])
else:
intermediate = self._list[aslice[0]]
if isinstance(intermediate, list):
ret = []
for client_resp in intermediate:
ret.append(client_resp[aslice[1]])
else:
ret = intermediate[aslice[1]]
else:
raise IndexError("UnifiedResponse objects must be sliced with integers or strings.")
if isinstance(ret, QueryResponseTable | QueryResponseColumn | QueryResponseRow):
return ret
return UnifiedResponse(*ret)
[docs]
def keys(self):
"""
Names of the contained responses.
One name may map to more than one response.
"""
ret = []
for res in self._list:
clientname = res.client.__class__.__name__.lower().split('client')[0]
if clientname not in ret:
ret.append(clientname)
return ret
@property
def file_num(self):
"""
The number of records returned in all responses.
"""
return self._numfile
def _repr_html_(self):
nprov = len(self)
if nprov == 1:
ret = f'Results from {len(self)} Provider:</br></br>'
else:
ret = f'Results from {len(self)} Providers:</br></br>'
for block in self:
ret += f"{len(block)} Results from the {block.client.__class__.__name__}:</br>"
ret += block._repr_html_()
ret += '</br>'
return ret
def __repr__(self):
return object.__repr__(self) + "\n" + str(self)
def __str__(self):
nprov = len(self)
if nprov == 1:
ret = f'Results from {len(self)} Provider:\n\n'
else:
ret = f'Results from {len(self)} Providers:\n\n'
for block in self:
ret += f"{len(block)} Results from the {block.client.__class__.__name__}:\n"
if block.client.info_url is not None:
ret += f'Source: {block.client.info_url}\n'
size = block.total_size()
if np.isfinite(size):
ret += f'Total estimated size: {size}\n'
ret += '\n'
lines = repr(block).split('\n')
ret += '\n'.join(lines[1:])
ret += '\n\n'
return ret
[docs]
def show(self, *cols):
"""
Displays response tables with desired columns for the Query.
Parameters
----------
\\*cols : `tuple`
Name of columns to be shown.
Returns
-------
`list` of `astropy.table.Table`
A list of tables showing values for specified columns.
"""
return type(self)(*[i.show(*cols) for i in self._list])
@property
def all_colnames(self):
"""
Returns all the colnames in any of the tables in this response.
Any column names in this list are valid inputs to :meth:`.UnifiedResponse.show`.
"""
colnames = set(self[0].colnames)
for resp in self[1:]:
colnames.union(resp.colnames)
return sorted(list(colnames))
query_walker = attr.AttrWalker()
"""
We construct an `AttrWalker` which calls `_make_query_to_client` for each
logical component of the query, i.e. any block which are ANDed together.
"""
@query_walker.add_creator(attr.DataAttr)
def _create_data(walker, query, factory):
return factory._make_query_to_client(query)
@query_walker.add_creator(attr.AttrAnd)
def _create_and(walker, query, factory):
return factory._make_query_to_client(*query.attrs)
@query_walker.add_creator(attr.AttrOr)
def _create_or(walker, query, factory):
qblocks = []
for attrblock in query.attrs:
qblocks += walker.create(attrblock, factory)
return qblocks
[docs]
class UnifiedDownloaderFactory(BasicRegistrationFactory):
"""
Fido is a unified data search and retrieval tool.
It provides simultaneous access to a variety of online data sources, some
cover multiple instruments and data products like the Virtual Solar
Observatory and some are specific to a single source.
For details of using `~sunpy.net.Fido` see :ref:`sunpy-tutorial-acquiring-data-index`.
"""
[docs]
def search(self, *query):
"""
Query for data in form of multiple parameters.
Examples
--------
Query for LYRA timeseries data for the time range ('2012/3/4','2012/3/6')
>>> from sunpy.net import Fido, attrs as a
>>> import astropy.units as u
>>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'), a.Instrument.lyra) # doctest: +REMOTE_DATA
Query for data from Nobeyama Radioheliograph and RHESSI
>>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'),
... (a.Instrument.norh & a.Wavelength(17*u.GHz)) | a.Instrument.rhessi) # doctest: +REMOTE_DATA
Query for 304 Angstrom SDO AIA data with a cadence of 10 minutes
>>> import astropy.units as u
>>> from sunpy.net import Fido, attrs as a
>>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'),
... a.Instrument.aia,
... a.Wavelength(304*u.angstrom, 304*u.angstrom),
... a.Sample(10*u.minute)) # doctest: +SKIP
Parameters
----------
*query : `sunpy.net.vso.attrs`, `sunpy.net.jsoc.attrs`
A query consisting of multiple parameters which define the
requested data. The query is specified using attributes from the
VSO and the JSOC. The query can mix attributes from the VSO and
the JSOC.
Returns
-------
`sunpy.net.fido_factory.UnifiedResponse`
Container of responses returned by clients servicing query.
Notes
-----
The conjunction 'and' transforms query into disjunctive normal form
ie. query is now of form A & B or ((A & B) | (C & D))
This helps in modularising query into parts and handling each of the
parts individually.
"""
query = attr.and_(*query)
results = query_walker.create(query, self)
# If we have searched the VSO but no results were returned, but another
# client generated results, we drop the empty VSO results for tidiness.
# This is because the VSO _can_handle_query is very broad because we
# don't know the full list of supported values we can search for (yet).
results = [r for r in results if not isinstance(r, vso.VSOQueryResponseTable) or len(r) > 0]
return UnifiedResponse(*results)
[docs]
def fetch(self, *query_results, path=None, max_conn=5, progress=True,
overwrite=False, downloader=None, **kwargs):
"""
Download the records represented by `~sunpy.net.base_client.QueryResponseTable` or
`~sunpy.net.fido_factory.UnifiedResponse` objects.
Parameters
----------
*query_results : `sunpy.net.fido_factory.UnifiedResponse` or `~sunpy.net.base_client.QueryResponseTable`
Container returned by query method, or multiple.
path : `str`
The directory to retrieve the files into. Can refer to any fields
in `~sunpy.net.base_client.BaseQueryResponse.response_block_properties` via string formatting,
moreover the file-name of the file downloaded can be referred to as file,
e.g. "{source}/{instrument}/{time.start}/{file}".
max_conn : `int`, optional
The number of parallel download slots.
progress : `bool`, optional
If `True` show a progress bar showing how many of the total files
have been downloaded. If `False`, no progress bars will be shown at all.
overwrite : `bool` or `str`, optional
Determine how to handle downloading if a file already exists with the
same name. If `False` the file download will be skipped and the path
returned to the existing file, if `True` the file will be downloaded
and the existing file will be overwritten, if ``'unique'`` the filename
will be modified to be unique.
downloader : `parfive.Downloader`, optional
The download manager to use. If specified the ``max_conn``,
``progress`` and ``overwrite`` arguments are ignored.
Returns
-------
`parfive.Results`
Examples
--------
>>> from sunpy.net.attrs import Time, Instrument
>>> unifresp = Fido.search(Time('2012/3/4','2012/3/5'), Instrument('EIT')) # doctest: +REMOTE_DATA
>>> filepaths = Fido.fetch(unifresp) # doctest: +SKIP
If any downloads fail, they can be retried by passing the `parfive.Results` object back into ``fetch``.
>>> filepaths = Fido.fetch(filepaths) # doctest: +SKIP
"""
if path is None:
path = Path(config.get('downloads', 'download_dir')) / '{file}'
elif isinstance(path, str | os.PathLike) and '{file}' not in str(path):
path = Path(path) / '{file}'
else:
path = Path(path)
path = path.expanduser()
# Ensure we have write permissions to the path
exists = list(filter(lambda p: p.exists(), Path(path).resolve().parents))
if not os.access(exists[0], os.W_OK):
raise PermissionError('You do not have permission to write'
f' to the directory {exists[0]}.')
if "wait" in kwargs:
raise ValueError("wait is not a valid keyword argument to Fido.fetch.")
# Avoid more than one connection for JSOC only requests.
from sunpy.net.jsoc import JSOCClient
max_splits = kwargs.get('max_splits', 5)
is_jsoc_only = False
for query_result in query_results:
if isinstance(query_result, UnifiedResponse):
is_jsoc_only = all([isinstance(result.client, JSOCClient) for result in query_result])
elif isinstance(query_result, QueryResponseTable):
is_jsoc_only = all([isinstance(result.table.client, JSOCClient) for result in query_result])
if downloader is None:
if is_jsoc_only:
max_conn = 1
max_splits = 1
downloader = Downloader(max_conn=max_conn, progress=progress, overwrite=overwrite, max_splits=max_splits)
elif not isinstance(downloader, parfive.Downloader):
raise TypeError("The downloader argument must be a parfive.Downloader instance.")
# Handle retrying failed downloads
retries = [isinstance(arg, Results) for arg in query_results]
if all(retries):
results = Results()
for retry in query_results:
dr = downloader.retry(retry)
results.data += dr.data
results._errors += dr._errors
return results
elif any(retries):
raise TypeError("If any arguments to fetch are `parfive.Results` objects, all arguments must be.")
reslist = []
for query_result in query_results:
if isinstance(query_result, QueryResponseRow):
responses = [query_result.as_table()]
elif isinstance(query_result, QueryResponseTable):
responses = [query_result]
elif isinstance(query_result, UnifiedResponse):
responses = query_result
else:
raise ValueError(f"Query result has an unrecognized type: {type(query_result)} "
"Allowed types are QueryResponseRow, QueryResponseTable or UnifiedResponse.")
for block in responses:
result = block.client.fetch(block, path=path,
downloader=downloader,
wait=False, **kwargs)
if result not in (NotImplemented, None):
reslist.append(result)
results = downloader.download()
# Combine the results objects from all the clients into one Results object.
for result in reslist:
if not isinstance(result, Results):
raise TypeError(
"If wait is False a client must return a parfive.Downloader and either None"
" or a parfive.Results object.")
results.data += result.data
results._errors += result.errors
return results
[docs]
def __call__(self, *args, **kwargs):
raise TypeError(f"'{self.__class__.__name__}' object is not callable")
def _check_registered_widgets(self, *args):
"""Factory helper function"""
candidate_widget_types = list()
for key in self.registry:
if self.registry[key](*args):
candidate_widget_types.append(key)
n_matches = len(candidate_widget_types)
if n_matches == 0:
# There is no default client
raise NoMatchError("This query was not understood by any clients. Did you miss an OR?")
return candidate_widget_types
def _make_query_to_client(self, *query):
"""
Given a query, look up the client and perform the query.
Parameters
----------
*query : collection of `~sunpy.net.vso.attr` objects
Returns
-------
results : `list`
client : `object`
Instance of client class
"""
candidate_widget_types = self._check_registered_widgets(*query)
results = []
for client in candidate_widget_types:
tmpclient = client()
results.append(tmpclient.search(*query))
# This method is called by `search` and the results are fed into a
# UnifiedResponse object.
return results
def __repr__(self):
return object.__repr__(self) + "\n" + self._print_clients()
def __str__(self):
"""
This enables the "pretty" printing of the Fido Clients.
"""
return self._print_clients()
def _repr_html_(self):
"""
This enables the "pretty" printing of the Fido Clients with html.
"""
return self._print_clients(html=True)
def _print_clients(self, html=False, visible_entries=-1):
width = -1 if html else get_width()
t = Table(names=["Client", "Description"], dtype=["U80", "U120"])
lines = ["sunpy.net.Fido", dedent(self.__doc__)]
if html:
lines = [f"<p>{line}</p>" for line in lines]
for key in BaseClient._registry.keys():
t.add_row((key.__name__, dedent(
re.sub(r"\s+", " ", key.__doc__.partition("\n\n")[0]).strip())))
lines.extend(t.pformat(max_lines=visible_entries,
show_dtype=False, max_width=width, align="<", html=html))
return '\n'.join(lines)
Fido = UnifiedDownloaderFactory(
registry=BaseClient._registry, additional_validation_functions=['_can_handle_query'])