Source code for sunpy.net.fido_factory

"""
This module provides the `Fido
<sunpy.net.fido_factory.UnifiedDownloaderFactory>` instance of
`sunpy.net.fido_factory.UnifiedDownloaderFactory` it also provides the
`~sunpy.net.fido_factory.UnifiedResponse` class which
`Fido.search <sunpy.net.fido_factory.UnifiedDownloaderFactory.search>` returns and the
`parfive.Results` class that is returned by
`Fido.fetch <sunpy.net.fido_factory.UnifiedDownloaderFactory.fetch>`.
"""
import os
from pathlib import Path
from textwrap import dedent
from collections.abc import Sequence

import numpy as np
import parfive
from packaging.version import Version

from astropy.table import Table

from sunpy import config
from sunpy.net import attr, vso
from sunpy.net.base_client import BaseClient, QueryResponseColumn, QueryResponseRow, QueryResponseTable
from sunpy.util.datatype_factory_base import BasicRegistrationFactory, NoMatchError
from sunpy.util.parfive_helpers import Downloader, Results
from sunpy.util.util import get_width

__all__ = ['Fido', 'UnifiedResponse', 'UnifiedDownloaderFactory']

parfive_version = Version(parfive.__version__)


[docs] class UnifiedResponse(Sequence): """ The object used to store results from `~sunpy.net.fido_factory.UnifiedDownloaderFactory.search`. The `~sunpy.net.Fido` object returns results from multiple different clients. So it is always possible to sub-select these results, you can index this object with two indices. The first index is the client index, i.e. corresponding to the results from the `~sunpy.net.vso.VSOClient`. The second index can be used to select records from the results returned from that client, for instance if you only want every second result you could index the second dimension with ``::2``. """ def __init__(self, *results): """ Parameters ---------- *results : `sunpy.net.base_client.QueryResponseTable` One or more QueryResponse objects. """ self._list = [] self._numfile = 0 for result in results: if isinstance(result, QueryResponseRow): result = result.as_table() if isinstance(result, QueryResponseColumn): result = result.as_table() if not isinstance(result, QueryResponseTable): raise TypeError( f"{type(result)} is not derived from sunpy.net.base_client.QueryResponseTable") self._list.append(result) self._numfile += len(result) def __len__(self): return len(self._list) def _getitem_string(self, aslice): ret = [] for res in self._list: clientname = res.client.__class__.__name__ if aslice.lower() == clientname.lower().split('client')[0]: ret.append(res) if len(ret) == 1: ret = ret[0] elif len(ret) == 0: raise IndexError(f"{aslice} is not a valid key, valid keys are: {','.join(self.keys())}") return ret def __getitem__(self, aslice): """ Support slicing the UnifiedResponse as a 2D object. The first index is to the client and the second index is the records returned from those clients. """ if isinstance(aslice, (int, slice)): ret = self._list[aslice] # using the client's name for indexing the responses. elif isinstance(aslice, str): ret = self._getitem_string(aslice) # Make sure we only have a length two slice. elif isinstance(aslice, tuple): if len(aslice) > 2: raise IndexError("UnifiedResponse objects can only " "be sliced with one or two indices.") # Indexing both client and records, but only for one client. if isinstance(aslice[0], str): intermediate = self._getitem_string(aslice[0]) else: intermediate = self._list[aslice[0]] if isinstance(intermediate, list): ret = [] for client_resp in intermediate: ret.append(client_resp[aslice[1]]) else: ret = intermediate[aslice[1]] else: raise IndexError("UnifiedResponse objects must be sliced with integers or strings.") if isinstance(ret, (QueryResponseTable, QueryResponseColumn, QueryResponseRow)): return ret return UnifiedResponse(*ret)
[docs] def path_format_keys(self): """ Returns all the names that can be used to format filenames. Only the keys which can be used to format all results from all responses contained in this `~.UnifiedResponse` are returned. Each individual response might have more keys available. Each one corresponds to a single column in the table, and the format syntax should match the dtype of that column, i.e. for a ``Time`` object or a ``Quantity``. """ s = self[0].path_format_keys() for table in self[1:]: s = s.intersection(table.path_format_keys()) return s
[docs] def keys(self): """ Names of the contained responses. One name may map to more than one response. """ ret = [] for res in self._list: clientname = res.client.__class__.__name__.lower().split('client')[0] if clientname not in ret: ret.append(clientname) return ret
@property def file_num(self): """ The number of records returned in all responses. """ return self._numfile def _repr_html_(self): nprov = len(self) if nprov == 1: ret = f'Results from {len(self)} Provider:</br></br>' else: ret = f'Results from {len(self)} Providers:</br></br>' for block in self: ret += f"{len(block)} Results from the {block.client.__class__.__name__}:</br>" ret += block._repr_html_() ret += '</br>' return ret def __repr__(self): return object.__repr__(self) + "\n" + str(self) def __str__(self): nprov = len(self) if nprov == 1: ret = f'Results from {len(self)} Provider:\n\n' else: ret = f'Results from {len(self)} Providers:\n\n' for block in self: ret += f"{len(block)} Results from the {block.client.__class__.__name__}:\n" if block.client.info_url is not None: ret += f'Source: {block.client.info_url}\n' size = block.total_size() if np.isfinite(size): ret += f'Total estimated size: {size}\n' ret += '\n' lines = repr(block).split('\n') ret += '\n'.join(lines[1:]) ret += '\n\n' return ret
[docs] def show(self, *cols): """ Displays response tables with desired columns for the Query. Parameters ---------- \\*cols : `tuple` Name of columns to be shown. Returns ------- `list` of `astropy.table.Table` A list of tables showing values for specified columns. """ return type(self)(*[i.show(*cols) for i in self._list])
@property def all_colnames(self): """ Returns all the colnames in any of the tables in this response. Any column names in this list are valid inputs to :meth:`.UnifiedResponse.show`. """ colnames = set(self[0].colnames) for resp in self[1:]: colnames.union(resp.colnames) return sorted(list(colnames))
query_walker = attr.AttrWalker() """ We construct an `AttrWalker` which calls `_make_query_to_client` for each logical component of the query, i.e. any block which are ANDed together. """ @query_walker.add_creator(attr.DataAttr) def _create_data(walker, query, factory): return factory._make_query_to_client(query) @query_walker.add_creator(attr.AttrAnd) def _create_and(walker, query, factory): return factory._make_query_to_client(*query.attrs) @query_walker.add_creator(attr.AttrOr) def _create_or(walker, query, factory): qblocks = [] for attrblock in query.attrs: qblocks += walker.create(attrblock, factory) return qblocks
[docs] class UnifiedDownloaderFactory(BasicRegistrationFactory): """ Fido is a unified data search and retrieval tool. It provides simultaneous access to a variety of online data sources, some cover multiple instruments and data products like the Virtual Solar Observatory and some are specific to a single source. For details of using `~sunpy.net.Fido` see :ref:`sunpy-tutorial-acquiring-data-index`. """
[docs] def search(self, *query): """ Query for data in form of multiple parameters. Examples -------- Query for LYRA timeseries data for the time range ('2012/3/4','2012/3/6') >>> from sunpy.net import Fido, attrs as a >>> import astropy.units as u >>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'), a.Instrument.lyra) # doctest: +REMOTE_DATA Query for data from Nobeyama Radioheliograph and RHESSI >>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'), ... (a.Instrument.norh & a.Wavelength(17*u.GHz)) | a.Instrument.rhessi) # doctest: +REMOTE_DATA Query for 304 Angstrom SDO AIA data with a cadence of 10 minutes >>> import astropy.units as u >>> from sunpy.net import Fido, attrs as a >>> unifresp = Fido.search(a.Time('2012/3/4', '2012/3/6'), ... a.Instrument.aia, ... a.Wavelength(304*u.angstrom, 304*u.angstrom), ... a.Sample(10*u.minute)) # doctest: +REMOTE_DATA Parameters ---------- *query : `sunpy.net.vso.attrs`, `sunpy.net.jsoc.attrs` A query consisting of multiple parameters which define the requested data. The query is specified using attributes from the VSO and the JSOC. The query can mix attributes from the VSO and the JSOC. Returns ------- `sunpy.net.fido_factory.UnifiedResponse` Container of responses returned by clients servicing query. Notes ----- The conjunction 'and' transforms query into disjunctive normal form ie. query is now of form A & B or ((A & B) | (C & D)) This helps in modularising query into parts and handling each of the parts individually. """ query = attr.and_(*query) results = query_walker.create(query, self) # If we have searched the VSO but no results were returned, but another # client generated results, we drop the empty VSO results for tidiness. # This is because the VSO _can_handle_query is very broad because we # don't know the full list of supported values we can search for (yet). results = [r for r in results if not isinstance(r, vso.VSOQueryResponseTable) or len(r) > 0] return UnifiedResponse(*results)
[docs] def fetch(self, *query_results, path=None, max_conn=5, progress=True, overwrite=False, downloader=None, **kwargs): """ Download the records represented by `~sunpy.net.base_client.QueryResponseTable` or `~sunpy.net.fido_factory.UnifiedResponse` objects. Parameters ---------- *query_results : `sunpy.net.fido_factory.UnifiedResponse` or `~sunpy.net.base_client.QueryResponseTable` Container returned by query method, or multiple. path : `str` The directory to retrieve the files into. Can refer to any fields in `~sunpy.net.base_client.BaseQueryResponse.response_block_properties` via string formatting, moreover the file-name of the file downloaded can be referred to as file, e.g. "{source}/{instrument}/{time.start}/{file}". max_conn : `int`, optional The number of parallel download slots. progress : `bool`, optional If `True` show a progress bar showing how many of the total files have been downloaded. If `False`, no progress bars will be shown at all. overwrite : `bool` or `str`, optional Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if ``'unique'`` the filename will be modified to be unique. downloader : `parfive.Downloader`, optional The download manager to use. If specified the ``max_conn``, ``progress`` and ``overwrite`` arguments are ignored. Returns ------- `parfive.Results` Examples -------- >>> from sunpy.net.attrs import Time, Instrument >>> unifresp = Fido.search(Time('2012/3/4','2012/3/5'), Instrument('EIT')) # doctest: +REMOTE_DATA >>> filepaths = Fido.fetch(unifresp) # doctest: +SKIP If any downloads fail, they can be retried by passing the `parfive.Results` object back into ``fetch``. >>> filepaths = Fido.fetch(filepaths) # doctest: +SKIP """ if path is None: path = Path(config.get('downloads', 'download_dir')) / '{file}' elif isinstance(path, (str, os.PathLike)) and '{file}' not in str(path): path = Path(path) / '{file}' else: path = Path(path) path = path.expanduser() # Ensure we have write permissions to the path exists = list(filter(lambda p: p.exists(), Path(path).resolve().parents)) if not os.access(exists[0], os.W_OK): raise PermissionError('You do not have permission to write' f' to the directory {exists[0]}.') if "wait" in kwargs: raise ValueError("wait is not a valid keyword argument to Fido.fetch.") # Avoid more than one connection for JSOC only requests. from sunpy.net.jsoc import JSOCClient max_splits = kwargs.get('max_splits', 5) is_jsoc_only = False for query_result in query_results: if isinstance(query_result, UnifiedResponse): is_jsoc_only = all([isinstance(result.client, JSOCClient) for result in query_result]) elif isinstance(query_result, QueryResponseTable): is_jsoc_only = all([isinstance(result.table.client, JSOCClient) for result in query_result]) if downloader is None: if is_jsoc_only: max_conn = 1 max_splits = 1 downloader = Downloader(max_conn=max_conn, progress=progress, overwrite=overwrite, max_splits=max_splits) elif not isinstance(downloader, parfive.Downloader): raise TypeError("The downloader argument must be a parfive.Downloader instance.") # Handle retrying failed downloads retries = [isinstance(arg, Results) for arg in query_results] if all(retries): results = Results() for retry in query_results: dr = downloader.retry(retry) results.data += dr.data results._errors += dr._errors return results elif any(retries): raise TypeError("If any arguments to fetch are `parfive.Results` objects, all arguments must be.") reslist = [] for query_result in query_results: if isinstance(query_result, QueryResponseRow): responses = [query_result.as_table()] elif isinstance(query_result, QueryResponseTable): responses = [query_result] elif isinstance(query_result, UnifiedResponse): responses = query_result else: raise ValueError(f"Query result has an unrecognized type: {type(query_result)} " "Allowed types are QueryResponseRow, QueryResponseTable or UnifiedResponse.") for block in responses: result = block.client.fetch(block, path=path, downloader=downloader, wait=False, **kwargs) if result not in (NotImplemented, None): reslist.append(result) results = downloader.download() # Combine the results objects from all the clients into one Results object. for result in reslist: if not isinstance(result, Results): raise TypeError( "If wait is False a client must return a parfive.Downloader and either None" " or a parfive.Results object.") results.data += result.data results._errors += result.errors return results
[docs] def __call__(self, *args, **kwargs): raise TypeError(f"'{self.__class__.__name__}' object is not callable")
def _check_registered_widgets(self, *args): """Factory helper function""" candidate_widget_types = list() for key in self.registry: if self.registry[key](*args): candidate_widget_types.append(key) n_matches = len(candidate_widget_types) if n_matches == 0: # There is no default client raise NoMatchError("This query was not understood by any clients. Did you miss an OR?") return candidate_widget_types def _make_query_to_client(self, *query): """ Given a query, look up the client and perform the query. Parameters ---------- *query : collection of `~sunpy.net.vso.attr` objects Returns ------- results : `list` client : `object` Instance of client class """ candidate_widget_types = self._check_registered_widgets(*query) results = [] for client in candidate_widget_types: tmpclient = client() kwargs = dict() # Handle the change in response format in the VSO if isinstance(tmpclient, vso.VSOClient): kwargs = dict(response_format="table") results.append(tmpclient.search(*query, **kwargs)) # This method is called by `search` and the results are fed into a # UnifiedResponse object. return results def __repr__(self): return object.__repr__(self) + "\n" + self._print_clients() def __str__(self): """ This enables the "pretty" printing of the Fido Clients. """ return self._print_clients() def _repr_html_(self): """ This enables the "pretty" printing of the Fido Clients with html. """ return self._print_clients(html=True) def _print_clients(self, html=False, visible_entries=-1): width = -1 if html else get_width() t = Table(names=["Client", "Description"], dtype=["U80", "U120"]) lines = ["sunpy.net.Fido", dedent(self.__doc__)] if html: lines = [f"<p>{line}</p>" for line in lines] for key in BaseClient._registry.keys(): t.add_row((key.__name__, dedent( key.__doc__.partition("\n\n")[0].replace("\n ", " ")))) lines.extend(t.pformat_all(max_lines=visible_entries, show_dtype=False, max_width=width, align="<", html=html)) return '\n'.join(lines)
Fido = UnifiedDownloaderFactory( registry=BaseClient._registry, additional_validation_functions=['_can_handle_query'])