"""
Allow representation of queries as logic expressions. This module makes
sure that attributes that are combined using the two logic operations AND (&)
and OR (|) always are in disjunctive normal form, that is, there are only two
levels - the first being disjunction and the second being conjunction. In other
words, every combinations of attributes looks like this:
(a AND b AND c) OR (d AND e).
Walkers are used to traverse the tree that results from combining attributes.
They are implemented using `functools.singledispatch` modified to dispatch on the second argument to the function.
Please note that & is evaluated first, so A & B | C is equivalent to
(A & B) | C.
"""
import re
import string
import inspect
import keyword
import textwrap
from textwrap import dedent
from collections import namedtuple, defaultdict
from astropy.table import Table
from astropy.utils.misc import isiterable
from sunpy.extern import inflect
from sunpy.util.functools import seconddispatch
from sunpy.util.util import get_width
_ATTR_TUPLE = namedtuple("attr", "name client name_long desc")
# Matches any number.
NUMBER_REGEX = re.compile(r"^(\d+$|\d(?:\.\d+)?)")
__all__ = ['Attr', 'DataAttr', 'DummyAttr', 'SimpleAttr', 'Range', 'AttrAnd', 'AttrOr',
'ValueAttr', 'and_', 'or_', 'AttrWalker', 'AttrComparison', 'ComparisonParamAttrWrapper']
def make_tuple():
return _ATTR_TUPLE([], [], [], [])
def _print_attrs(attr, html=False):
"""
Given a Attr class will print out each registered attribute.
Parameters
----------
attr : `sunpy.net.attr.Attr`
The attr class/type to print for.
html : bool
Will return a html table instead.
Returns
-------
`str`
String with the registered attributes.
"""
attrs = attr._attr_registry[attr]
# Only sort the attrs if any have been registered
sorted_attrs = _ATTR_TUPLE(*zip(*sorted(zip(*attrs)))) if attrs.name else make_tuple()
*other_row_data, descs = sorted_attrs
descs = [(dsc[:77] + '...') if len(dsc) > 80 else dsc for dsc in descs]
table = Table(names=["Attribute Name", "Client", "Full Name", "Description"],
dtype=["U80", "U80", "U80", "U80"],
data=[*other_row_data, descs])
class_name = f"{(attr.__module__ + '.') or ''}{attr.__name__}"
lines = [class_name]
# If the attr lacks a __doc__ this will error and prevent this from returning anything.
try:
lines.append(dedent(attr.__doc__.partition("\n\n")[0]) + "\n")
except AttributeError:
pass
format_line = "<p>{}</p>" if html else "{}"
width = -1 if html else get_width()
lines = [*[format_line.format(line) for line in lines],
*table.pformat_all(show_dtype=False, max_width=width, align="<", html=html)]
return '\n'.join(lines)
class AttrMeta(type):
"""
We want to enable discovery, by tab completion, of values for all subclasses of Attr.
So have to create a metaclass that overloads the methods that Python uses, so that they work on the classes.
This would allow that `attrs.Instrument` to be able to tab complete to `attrs.Instrument.aia`.
"""
# The aim is to register Attrs as a namedtuple of lists
# So we define the namedtuple outside of AttrMeta, see above.
_attr_registry = defaultdict(make_tuple)
def __getattr__(self, item):
"""
Our method for Attrs is to register using the attribute type (i.e. Instrument) as keys
in a dictionary. ``_attr_registry`` is a dictionary with the keys being subclasses of Attr
and the value being the namedtuple of lists.
As a result we index `_attr_registry` with `[self]` which will be the `type`
of the `Attr` class to access the dictionary. This will return the namedtuple
that has three attributes: `name`, `name_long` and `desc`.
Each of which are a list. `name` will be the attribute name, `name_long` is
the original name passed in and `desc` the description of the object.
"""
# Get the revelant entries.
registry = self._attr_registry[self]
# All the attribute names under that type(Attr)
names = registry.name
try:
# We return Attr(name_long) to create the Attr requested.
return self(registry.name_long[names.index(item)])
except ValueError:
raise AttributeError(f'This attribute, {item} is not defined, please register it.')
def __dir__(self):
"""
To tab complete in Python we need to add to the `__dir__()` return.
So we add all the registered values for this subclass of Attr to the output.
"""
custom_attrs = set(self._attr_registry[self].name)
# "all" can be registered as a documentation helper, but isn't a valid attr
custom_attrs.discard("all")
return super().__dir__() + list(custom_attrs)
def __repr__(self):
"""
Returns the normal repr plus the pretty attr __str__.
"""
return f"{type.__repr__(self)}\n{str(self)}"
def __str__(self):
"""
This enables the "pretty" printing of Attrs.
"""
return _print_attrs(self)
def _repr_html_(self):
"""
This enables the "pretty" printing of Attrs with html.
"""
return _print_attrs(self, html=True)
[docs]class Attr(metaclass=AttrMeta):
"""This is the base for all attributes."""
def __and__(self, other):
if isinstance(other, AttrOr):
return AttrOr([elem & self for elem in other.attrs])
if self.collides(other):
return NotImplemented
if isinstance(other, AttrAnd):
return AttrAnd([self, *other.attrs])
return AttrAnd([self, other])
def __hash__(self):
return hash(frozenset(vars(self).items()))
def __or__(self, other):
# Optimization.
if self == other:
return self
if isinstance(other, AttrOr):
return AttrOr([self, *other.attrs])
return AttrOr([self, other])
[docs] def collides(self, other):
raise NotImplementedError
def __eq__(self, other):
if not isinstance(other, Attr):
return False
return vars(self) == vars(other)
[docs] @classmethod
def update_values(cls, adict):
"""
Clients will use this method to register their values for subclasses of `~sunpy.net.attr.Attr`.
The input has to be a dictionary, with each key being an instance of a client.
The value for each client has to be a dictionary with each key being a subclass of Attr.
The value for each Attr key should be a list of tuples with each tuple of the form
``(Name, Description)``.
If you do not want to add a description, you can put `None` or an empty string.
We sanitize the name you provide by removing all special characters and making it all lower case.
If it still invalid we will append to the start of the name to make it a valid attribute name.
This is an ugly workaround, so please try to pick a valid attribute name.
**It is recommended that clients register values for all attrs used by their `_can_handle_query` method.**
Parameters
----------
adict : `dict`
The keys for this dictionary have to be an instance of a downloader client.
The values for each client are a dictionary that has keys of `~sunpy.net.attr.Attr`.
Each key should have a list of tuples as it value.
Each tuple should be a pair of strings.
First string is the attribute name you want to register
Second string is the description of the attribute.
Examples
--------
# The first import is to make this example work, it should not be used otherwise
>>> from sunpy.net.dataretriever import GenericClient
>>> from sunpy.net import attr, attrs
>>> attr.Attr.update_values({GenericClient : {
... attrs.Instrument: [('AIA', 'AIA is in Space.'),
... ('HMI', 'HMI is next to AIA.')]}}) # doctest: +SKIP
>>> attr.Attr._attr_registry[attrs.Instrument] # doctest: +SKIP
attr(name=['aia', 'hmi'], client=['Generic', 'Generic'], name_long=['AIA', 'HMI'],
desc=['AIA is in Space.', 'HMI is next to AIA.'])
>>> attr.Attr._attr_registry[attrs.Instrument].name # doctest: +SKIP
['aia', 'hmi']
>>> attr.Attr._attr_registry[attrs.Instrument].name_long # doctest: +SKIP
['AIA', 'HMI']
>>> attr.Attr._attr_registry[attrs.Instrument].desc # doctest: +SKIP
['AIA is in Space.', 'HMI is next to AIA.']
>>> attrs.Instrument.aia, attrs.Instrument.hmi # doctest: +SKIP
(<sunpy.net.attrs.Instrument(AIA: AIA is in Space.) object at 0x...>,
<sunpy.net.attrs.Instrument(HMI: HMI is next to AIA.) object at 0x...>)
"""
for client, attr_dict in adict.items():
for attr, attr_values in attr_dict.items():
if not isiterable(attr_values) or isinstance(attr_values, str):
raise ValueError(f"Invalid input value: {attr_values} for key: {repr(attr)}. "
"The value is not iterable or just a string.")
attr_tuple = cls._attr_registry[attr]
for pair in attr_values:
if len(pair) > 2:
raise ValueError(f'Invalid length (!=2) for values: {attr_values}.')
elif len(pair) == 1:
if pair[0] != "*":
raise ValueError(
f'Invalid value given for * registration: {attr_values}.')
# Special case handling for * aka all values allowed.
pair = ["all", "All values of this type are supported."]
# Sanitize part one: Check if the name has a number in it
number_match = NUMBER_REGEX.match(pair[0])
p = inflect.engine()
try:
number_str = number_match.group(1)
name = p.number_to_words(number_str)
if number_str != number_match.string:
name = name + "_" + number_match.string[number_match.end(1):]
except AttributeError:
name = pair[0]
# Sanitize part two: remove punctuation and replace it with _
name = re.sub('[%s]' % re.escape(string.punctuation), '_', name)
# Sanitize name, we remove all special characters
name = ''.join(char for char in name
if char.isidentifier() or char.isnumeric())
# Make name lower case
name = name.lower()
if keyword.iskeyword(name):
# Attribute name has been appended with `_`
# to make it a valid identifier since its a python keyword.
name = name + '_'
attr_tuple[0].append(name)
attr_tuple[1].append(client.__name__.replace("Client", ""))
attr_tuple[2].append(pair[0])
attr_tuple[3].append(pair[1])
[docs]class DataAttr(Attr):
"""
A base class for attributes classes which contain data.
This is to differentiate them from classes like `sunpy.net.attr.AttrAnd` or
the base `sunpy.net.attr.Attr` class which do not. The motivation for this
distinction is to make it easier for walkers to match all classes which are
not user specified Attrs.
"""
def __new__(cls, *args, **kwargs):
if cls is DataAttr:
raise TypeError("You should not directly instantiate DataAttr, only it's subclasses.")
return super().__new__(cls)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Because __new__() is defined, this will block natural introspection of the arguments for
# __init__() in all subclasses because the signature of __new__() takes precedence over the
# signature of __init__(). We add a __new__() to all subclasses that do not explicitly
# define it with a signature that matches __init__().
if '__new__' not in cls.__dict__:
unsigned_new = cls.__new__ # the inherited __new__()
def signed_new(cls, *args, **kwargs):
return unsigned_new(cls, *args, **kwargs)
signed_new.__signature__ = inspect.signature(cls.__init__)
cls.__new__ = signed_new
[docs]class DummyAttr(Attr):
"""
Empty attribute. Useful for building up queries. Returns other
attribute when ORed or ANDed. It can be considered an empty query
that you can use as an initial value if you want to build up your
query in a loop.
So, if we wanted an attr matching all the time intervals between the times
stored as (from, to) tuples in a list, we could do.
.. code-block:: python
attr = DummyAttr()
for from, to in times:
attr |= Time(from, to)
"""
def __and__(self, other):
return other
def __or__(self, other):
return other
[docs] def collides(self, other):
return False
def __hash__(self):
return hash(None)
def __eq__(self, other):
return isinstance(other, DummyAttr)
[docs]class SimpleAttr(DataAttr):
"""
An attribute that only has a single value.
This type of attribute is not a composite and has a single value such as
``Instrument('EIT')``.
Parameters
----------
value : `object`
The value for the attribute to hold.
"""
def __init__(self, value):
super().__init__()
self.value = value
[docs] def collides(self, other):
return isinstance(other, self.__class__)
def __repr__(self):
obj_placeholder = " object "
attr_reg = AttrMeta._attr_registry[self.__class__]
new_repr = object.__repr__(self).split(obj_placeholder)
# If somehow the idx isn't in the attr reg,
# we still want it to print it repr without error.
try:
idx = attr_reg.name_long.index(self.value)
obj_value_repr = f"({self.value}: {attr_reg.desc[idx]})"
except ValueError:
obj_value_repr = f": {self.value}"
new_repr = [new_repr[0], obj_value_repr, obj_placeholder, new_repr[1]]
return textwrap.fill("".join(new_repr), 100)
@property
def type_name(self):
return self.__class__.__name__.lower()
[docs]class AttrComparison(DataAttr):
"""
Allows a Attr to have a value and a comparison operator.
"""
def __init__(self, name, operator, value):
super().__init__()
self.name = name
self.operator = operator
self.value = value
[docs] def collides(self, other):
if not isinstance(other, self.__class__):
return False
return self.operator == other.operator and self.name == other.name
[docs]class ComparisonParamAttrWrapper:
def __init__(self, name):
self.name = name
def __lt__(self, other):
return AttrComparison(self.name, '<', other)
def __le__(self, other):
return AttrComparison(self.name, '<=', other)
def __gt__(self, other):
return AttrComparison(self.name, '>', other)
def __ge__(self, other):
return AttrComparison(self.name, '>=', other)
def __eq__(self, other):
return AttrComparison(self.name, '=', other)
def __ne__(self, other):
return AttrComparison(self.name, '!=', other)
[docs] def collides(self, other):
return isinstance(other, ComparisonParamAttrWrapper)
[docs]class Range(DataAttr):
"""
An attribute that represents a range of a value.
This type of attribute would be applicable for types like Wavelength or Time.
The range is inclusive of both the min and max.
Parameters
----------
min_ : `object`
The lower bound of the range.
max_ : `object`
The upper bound of the range.
"""
def __init__(self, min_, max_):
self.min = min_
self.max = max_
super().__init__()
def __xor__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
new = DummyAttr()
if self.min < other.min:
new |= type(self)(self.min, min(other.min, self.max))
if other.max < self.max:
new |= type(self)(other.max, self.max)
return new
def __contains__(self, other):
if isinstance(other, Range):
return self.min <= other.min and self.max >= other.max
else:
return self.min <= other <= self.max
[docs]class AttrAnd(Attr):
""" Attribute representing attributes ANDed together. """
def __init__(self, attrs):
super().__init__()
self.attrs = attrs
def __and__(self, other):
if any(other.collides(elem) for elem in self.attrs):
return NotImplemented
if isinstance(other, AttrAnd):
return AttrAnd([*self.attrs, *other.attrs])
if isinstance(other, AttrOr):
return AttrOr([elem & self for elem in other.attrs])
return AttrAnd([*self.attrs, other])
__rand__ = __and__
def __repr__(self):
return f"<AttrAnd({self.attrs!r})>"
def __eq__(self, other):
if not isinstance(other, AttrAnd):
return False
return set(self.attrs) == set(other.attrs)
def __hash__(self):
return hash(frozenset(self.attrs))
[docs] def collides(self, other):
return any(elem.collides(other) for elem in self.attrs)
[docs]class AttrOr(Attr):
""" Attribute representing attributes ORed together. """
def __init__(self, attrs):
super().__init__()
self.attrs = attrs
def __or__(self, other):
if isinstance(other, AttrOr):
return AttrOr([*self.attrs, *other.attrs])
return AttrOr([*self.attrs, other])
__ror__ = __or__
def __and__(self, other):
return AttrOr([elem & other for elem in self.attrs])
__rand__ = __and__
def __xor__(self, other):
new = AttrOr([])
for elem in self.attrs:
try:
new |= elem ^ other
except TypeError:
pass
return new
def __contains__(self, other):
for elem in self.attrs:
try:
if other in elem:
return True
except TypeError:
pass
return False
def __repr__(self):
return f"<AttrOr({self.attrs!r})>"
def __eq__(self, other):
if not isinstance(other, AttrOr):
return False
return set(self.attrs) == set(other.attrs)
def __hash__(self):
return hash(frozenset(self.attrs))
[docs] def collides(self, other):
return all(elem.collides(other) for elem in self.attrs)
# This appears to only be used as a base type for the Walker, i.e. a common
# denominator for the walker to convert to whatever the output of the walker is
# going to be.
[docs]class ValueAttr(DataAttr):
def __init__(self, attrs):
super().__init__()
self.attrs = attrs
def __repr__(self):
return f"<ValueAttr({self.attrs!r})>"
def __hash__(self):
return hash(frozenset(self.attrs.items()))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.attrs == other.attrs
[docs] def collides(self, other):
if not isinstance(other, self.__class__):
return False
return any(k in other.attrs for k in self.attrs)
[docs]class AttrWalker:
"""
Traverse the Attr tree and convert it to a different representation.
The ``AttrWalker`` can walk a complex tree of attrs and represent that tree
in a way that is useful to the client using the attrs. For the VSO client
it generates a ``VSOQueryResponseTable`` object, for the database module
it performs database queries and returns results from the database.
The walker has three core operations that can be applied to the tree, all
of these are functions which are applied to one or more
`~sunpy.net.attr.Attr` types, using conditional dispatch based on type.
* creators: Creators when given an `~sunpy.net.attr.Attr` return a new object.
* appliers: Appliers process an `~sunpy.net.attr.Attr` type and modify any
arguments passed.
* converters: Converters convert types unknown to any other creator or
appliers to types known by them. They take in an `~sunpy.net.attr.Attr`
type and return a different one.
"""
def __repr__(self):
creators = list(self.createmm.registry.keys())
appliers = list(self.applymm.registry.keys())
return f"""{super().__repr__()}
Registered creators:\n {creators}\n
Registered appliers:\n {appliers}"""
@staticmethod
def _unknown_type_apply(*args, **kwargs):
raise TypeError(
f"{args[1]} or any of its parents have not been registered using "
"add_applier() with the AttrWalker")
@staticmethod
def _unknown_type_create(*args, **kwargs):
raise TypeError(
f"{args[1]} or any of its parents have not been registered using "
"add_creator() with the AttrWalker")
def __init__(self):
self.applymm = seconddispatch(self._unknown_type_apply)
self.createmm = seconddispatch(self._unknown_type_create)
[docs] def create(self, *args, **kwargs):
"""
Call the create function(s) matching the arguments to this method.
"""
return self.createmm(self, *args, **kwargs)
[docs] def apply(self, *args, **kwargs):
"""
Call the apply function(s) matching the arguments to this method.
"""
return self.applymm(self, *args, **kwargs)
[docs] def add_creator(self, *types):
"""
Register all specified types with this function for the ``.create`` method.
"""
def _dec(fun):
for type_ in types:
self.createmm.register(type_, fun)
return fun
return _dec
[docs] def add_applier(self, *types):
"""
Register all specified types with this function for the ``.apply`` method.
"""
def _dec(fun):
for type_ in types:
self.applymm.register(type_, fun)
return fun
return _dec
[docs] def add_converter(self, *types):
"""
Register a function to convert the specified type into a known type for create and apply.
After a converter is run, create or apply will be called again with the new types.
"""
def _dec(fun):
for type_ in types:
self.applymm.register(type_, self._cv_apply(fun))
self.createmm.register(type_, self._cv_create(fun))
return fun
return _dec
def _cv_apply(self, fun):
"""
Call the converter and then re-call apply.
"""
def _fun(*args, **kwargs):
args = list(args)
args[1] = fun(args[1])
return self.applymm(*args, **kwargs)
return _fun
def _cv_create(self, fun):
"""
Call the converter and then re-call convert.
"""
def _fun(*args, **kwargs):
args = list(args)
args[1] = fun(args[1])
return self.createmm(*args, **kwargs)
return _fun
[docs]def and_(*args):
""" Trick operator precedence.
and_(foo < bar, bar < baz)
"""
value = DummyAttr()
for elem in args:
value &= elem
return value
[docs]def or_(*args):
""" Trick operator precedence.
or_(foo < bar, bar < baz)
"""
value = DummyAttr()
for elem in args:
value |= elem
return value