Source code for sunpy.net.scraper

"""
This module provides a web scraper.
"""
import os
import re
from time import sleep
from ftplib import FTP
from datetime import datetime
from urllib.error import URLError, HTTPError
from urllib.parse import urlsplit
from urllib.request import urlopen

from bs4 import BeautifulSoup

from astropy.time import Time

from sunpy import log
from sunpy.extern.parse import parse
from sunpy.net.scraper_utils import date_floor, extract_timestep, get_timerange_from_exdict
from sunpy.time.timerange import TimeRange
from sunpy.util.decorators import deprecated_renamed_argument
from sunpy.util.exceptions import warn_user

__all__ = ['Scraper']


# Regular expressions to convert datetime format
TIME_CONVERSIONS = {
    '%Y': r'\d{4}', '%y': r'\d{2}',
    '%b': '[A-Z][a-z]{2}', '%B': r'\W', '%m': r'\d{2}',
    '%d': r'\d{2}', '%j': r'\d{3}',
    '%H': r'\d{2}', '%I': r'\d{2}',
    '%M': r'\d{2}',
    '%S': r'\d{2}', '%e': r'\d{3}', '%f': r'\d{6}'
}
# "parse" expressions to convert into datetime format
PARSE_TIME_CONVERSIONS = {
    "{year:4d}": "%Y", "{year:2d}": "%y",
    "{month:2d}": "%m",
    "{month_name:l}": "%B",
    "{month_name_abbr:l}": "%b",
    "{day:2d}": "%d", "{day_of_year:3d}": "%j",
    "{hour:2d}": "%H",
    "{minute:2d}": "%M",
    "{second:2d}": "%S",
    "{microsecond:6d}": "%f",
    "{millisecond:3d}": "%e",
    "{week_number:2d}": "%W",
}
DEPRECATED_MESSAGE = (
    "pattern has been replaced with the format keyword. "
    "This comes with a new syntax and there is a migration guide available at "
    "https://docs.sunpy.org/en/stable/how_to/scraper_migration.html."
)

[docs] class Scraper: """ A scraper to scrap web data archives based on dates. Parameters ---------- pattern : `str` A string containing the url with the date encoded as datetime formats, and any other parameter as ``kwargs`` as a string format. This can also be a uri to a local file patterns. Deprecated in favor of `format`. Default is `None`. regex : `bool` Set to `True` if parts of the pattern uses regexp symbols. This only works for the filename part of the pattern rather than the full url. Be careful that periods ``.`` matches any character and therefore it's better to escape them. If regexp is used, other ``kwargs`` are ignored and string replacement is not possible. Default is `False`. Deprecated in favor of `format`. Default is `False`. format : `str` A string containing the url with the date and other information to be extracted encoded as ``parse`` formats, and any other ``kwargs`` parameters as a string format, the former represented using double curly-brackets to differentiate from the latter. The accepted parse representations for datetime values are as given in ``PARSE_TIME_CONVERSIONS``. This can also be a uri to a local file patterns. Default is `None`. kwargs : `dict` A dictionary containing the values to be replaced in the pattern. Will be ignored if ``regex`` is `True`. Attributes ---------- pattern : `str` The pattern with the parse format. datetime_pattern : `str` The parse pattern in the datetime format. now : `datetime.datetime` The pattern with the actual date. This is not checking if there is an existent file, but just how the ``pattern`` looks with the current time. Examples -------- >>> from sunpy.net import Scraper >>> >>> pattern = ('http://proba2.oma.be/{instrument}/data/bsd/{{year:4d}}/{{month:2d}}/{{day:2d}}/' ... '{instrument}_lv1_{{year:4d}}{{month:2d}}{{day:2d}}_{{hour:2d}}{{month:2d}}{{second:2d}}.fits') >>> swap = Scraper(format=pattern, instrument='swap') >>> >>> print(swap.pattern) http://proba2.oma.be/swap/data/bsd/{year:4d}/{month:2d}/{day:2d}/swap_lv1_{year:4d}{month:2d}{day:2d}_{hour:2d}{month:2d}{second:2d}.fits >>> >>> print(swap.datetime_pattern) http://proba2.oma.be/swap/data/bsd/%Y/%m/%d/swap_lv1_%Y%m%d_%H%m%S.fits >>> >>> print(swap.now) # doctest: +SKIP http://proba2.oma.be/swap/data/bsd/2022/12/21/swap_lv1_20221221_112433.fits """ @deprecated_renamed_argument("pattern", None, since="6.1", message=DEPRECATED_MESSAGE) @deprecated_renamed_argument("regex", None, since="6.1", message=DEPRECATED_MESSAGE) def __init__(self, pattern=None, regex=False, *, format=None, **kwargs): if pattern is not None and format is None: self.use_old_format = True self.pattern = pattern elif pattern is None and format is not None: self.use_old_format = False self.pattern = format else: raise ValueError("Either 'pattern' or 'format' must be provided.") if self.use_old_format: if regex and kwargs: warn_user('regexp being used, the extra arguments passed are being ignored') self.pattern = pattern.format(**kwargs) if kwargs and not regex else self.pattern self.domain = f"{urlsplit(self.pattern).scheme}://{urlsplit(self.pattern).netloc}/" milliseconds = re.search(r'\%e', self.pattern) if not milliseconds: self.now = datetime.now().strftime(self.pattern) else: now = datetime.now() milliseconds_ = int(now.microsecond / 1000.) self.now = now.strftime(f'{self.pattern[0:milliseconds.start()]}{milliseconds_:03d}{self.pattern[milliseconds.end():]}') else: pattern = format.format(**kwargs) datetime_pattern = pattern for k, v in PARSE_TIME_CONVERSIONS.items(): if k in datetime_pattern: datetime_pattern = datetime_pattern.replace(k,v) self.datetime_pattern = datetime_pattern # so that they don't conflict later on, either one can help in extracting year if "year:4d" in pattern and "year:2d" in pattern: pattern = pattern.replace("year:2d", ":2d") self.pattern = pattern self.domain = f"{urlsplit(self.pattern).scheme}://{urlsplit(self.pattern).netloc}/" milliseconds = re.search(r'\%e', self.datetime_pattern) if not milliseconds: self.now = datetime.now().strftime(self.datetime_pattern) else: now = datetime.now() milliseconds_ = int(now.microsecond / 1000.) self.now = now.strftime(f'{self.datetime_pattern[0:milliseconds.start()]}{milliseconds_:03d}{self.datetime_pattern[milliseconds.end():]}')
[docs] def matches(self, filepath, date): """ Checks if the given filepath is how the file path is expected to look on given date based on the pattern. Parameters ---------- filepath : `str` File path to check. date : `datetime.datetime` or `astropy.time.Time` The date for which to check. Returns ------- `bool` `True` if the given filepath matches with the calculated one for given date, else `False`. """ if self.use_old_format: return date.strftime(self.pattern) == filepath else: return parse(date.strftime(self.datetime_pattern), filepath) is not None
[docs] def range(self, timerange): """ Gets the directories for a certain range of time. Parameters ---------- timerange : `~sunpy.time.timerange.TimeRange` Time interval where to find the directories for a given pattern. Returns ------- `list` of `str` All the possible directories valid for the time range given. Notice that these directories may not exist in the archive. """ # find directory structure - without file names if self.use_old_format: if '/' in self.pattern: directorypattern = '/'.join(self.pattern.split('/')[:-1]) + '/' else: if '/' in self.datetime_pattern: directorypattern = '/'.join(self.datetime_pattern.split('/')[:-1]) + '/' timestep = extract_timestep(directorypattern) if timestep is None: return [directorypattern] else: directories = [] cur = date_floor(timerange.start, timestep) end = date_floor(timerange.end, timestep) + timestep while cur < end: directories.append(cur.strftime(directorypattern)) cur = cur + timestep return directories
[docs] def filelist(self, timerange): """ Returns the list of existent files in the archive for the given time range. Parameters ---------- timerange : `~sunpy.time.TimeRange` Time interval where to find the directories for a given pattern. Returns ------- filesurls : `list` of `str` List of all the files found between the time range given. Examples -------- >>> from sunpy.net import Scraper >>> pattern = ('http://proba2.oma.be/{instrument}/data/bsd/{{year:4d}}/{{month:2d}}/{{day:2d}}/' ... '{instrument}_lv1_{{year:4d}}{{month:2d}}{{day:2d}}_{{hour:2d}}{{minute:2d}}{{second:2d}}.fits') >>> swap = Scraper(format=pattern, instrument='swap') >>> from sunpy.time import TimeRange >>> timerange = TimeRange('2015-01-01T00:08:00','2015-01-01T00:12:00') >>> print(swap.filelist(timerange)) # doctest: +REMOTE_DATA ['http://proba2.oma.be/swap/data/bsd/2015/01/01/swap_lv1_20150101_000857.fits', 'http://proba2.oma.be/swap/data/bsd/2015/01/01/swap_lv1_20150101_001027.fits', 'http://proba2.oma.be/swap/data/bsd/2015/01/01/swap_lv1_20150101_001157.fits'] While writing the pattern, we can also leverage parse capabilities by using the ``{{}}`` notation to match parts of the filename that cannot be known beforehand: >>> from sunpy.net import Scraper >>> from sunpy.time import TimeRange >>> pattern = 'http://proba2.oma.be/lyra/data/bsd/{{year:4d}}/{{month:2d}}/{{day:2d}}/{{}}_lev{{Level:1d}}_std.fits' >>> lyra = Scraper(format=pattern) >>> timerange = TimeRange('2023-03-06T00:00:00','2023-03-06T00:10:00') >>> print(swap.filelist(timerange)) # doctest: +REMOTE_DATA ['http://proba2.oma.be/swap/data/bsd/2023/03/06/swap_lv1_20230306_000128.fits', 'http://proba2.oma.be/swap/data/bsd/2023/03/06/swap_lv1_20230306_000318.fits', 'http://proba2.oma.be/swap/data/bsd/2023/03/06/swap_lv1_20230306_000508.fits', 'http://proba2.oma.be/swap/data/bsd/2023/03/06/swap_lv1_20230306_000658.fits', 'http://proba2.oma.be/swap/data/bsd/2023/03/06/swap_lv1_20230306_000848.fits'] Notes ----- The search is strict with the time range, so if the archive scraped contains daily files, but the range doesn't start from the beginning of the day, then the file for that day won't be selected. The end of the timerange will normally be OK as includes the file on such end time. """ directories = self.range(timerange) if urlsplit(directories[0]).scheme == "ftp": return self._ftpfilelist(timerange) elif urlsplit(directories[0]).scheme == "file": return self._localfilelist(timerange) elif urlsplit(directories[0]).scheme in ["http", "https"]: return self._httpfilelist(timerange) else: return ValueError("The provided pattern should either be an FTP or a local file-path, or an HTTP address.")
def _ftpfilelist(self, timerange): """ Goes over archives available over ftp to return list of files in the given timerange. """ directories = self.range(timerange) filesurls = list() ftpurl = urlsplit(directories[0]).netloc with FTP(ftpurl, user="anonymous", passwd="data@sunpy.org") as ftp: for directory in directories: try: ftp.cwd(urlsplit(directory).path) except Exception as e: log.debug(f"FTP CWD: {e}") continue for file_i in ftp.nlst(): fullpath = directory + file_i if self._url_follows_pattern(fullpath): if self._check_timerange(fullpath, timerange): filesurls.append(fullpath) filesurls = ['ftp://' + f"{urlsplit(url).netloc}{urlsplit(url).path}" for url in filesurls] return filesurls def _localfilelist(self, timerange): """ Goes over locally stored archives to return list of files in the given timerange. """ if self.use_old_format: pattern = self.pattern pattern_temp = pattern.replace('file://', '') if os.name == 'nt': pattern_temp = pattern_temp.replace('\\', '/') prefix = 'file:///' else: prefix = 'file://' self.pattern = pattern_temp directories = self.range(timerange) filepaths = list() for directory in directories: for file_i in os.listdir(directory): fullpath = directory + file_i if self._url_follows_pattern(fullpath): if self._check_timerange(fullpath, timerange): filepaths.append(fullpath) filepaths = [prefix + path for path in filepaths] self.pattern = pattern return filepaths else: pattern, datetime_pattern = self.pattern, self.datetime_pattern pattern_temp, datetime_pattern_temp = pattern.replace('file://', ''), datetime_pattern.replace('file://', '') if os.name == 'nt': pattern_temp = pattern_temp.replace('\\', '/') datetime_pattern_temp = datetime_pattern_temp.replace('\\', '/') prefix = 'file:///' else: prefix = 'file://' # Change pattern variables class-wide self.pattern, self.datetime_pattern = pattern_temp, datetime_pattern_temp directories = self.range(timerange) filepaths = list() for directory in directories: for file_i in os.listdir(directory): fullpath = directory + file_i if self._url_follows_pattern(fullpath): if self._check_timerange(fullpath, timerange): filepaths.append(fullpath) filepaths = [prefix + path for path in filepaths] # Set them back to their original values self.pattern, self.datetime_pattern = pattern, datetime_pattern return filepaths def _httpfilelist(self, timerange): """ Goes over http archives hosted on the web, to return list of files in the given timerange. """ directories = self.range(timerange) filesurls = list() retry_counts = {} while directories: directory = directories.pop(0) try: opn = urlopen(directory) try: soup = BeautifulSoup(opn, "html.parser") for link in soup.find_all("a"): href = link.get("href") if href is not None: if href[0] == '/': fullpath = self.domain + href[1:] else: fullpath = directory + href if self._url_follows_pattern(fullpath): if self._check_timerange(fullpath, timerange): filesurls.append(fullpath) finally: opn.close() except HTTPError as http_err: # Ignore missing directories (issue #2684). if http_err.code == 404: log.debug(f"Directory {directory} not found.") continue if http_err.code in [400, 403]: log.debug(f"Got error {http_err.code} while scraping {directory} : {http_err.reason}") raise if http_err.code in [429, 504]: # See if the server has told us how long to back off for retry_after = http_err.hdrs.get('Retry-After', 2) try: # Ensure that we can parse the header as an int in sec retry_after = int(retry_after) except Exception as e: log.debug(f"Converting retry_after failed: {e}") retry_after = 2 log.debug( f"Got {http_err.code} while scraping {directory}, waiting for {retry_after} seconds before retrying." ) sleep(retry_after) if retry_counts.get(directory, 0) > 4: log.debug(f"Exceeded maximum retry limit for {directory}") raise retry_counts[directory] = retry_counts.get(directory, 0) + 1 directories.insert(0, directory) continue except URLError as url_err: log.debug(f"Failed to parse content from {directory}: {url_err}") raise except Exception: raise return filesurls def _check_timerange(self, url, timerange): """ Checks whether the time range represented in *url* intersects with the given time range. Parameters ---------- url : `str` URL of the file. timerange : `~sunpy.time.TimeRange` Time interval for which files were searched. Returns ------- `bool` `True` if URL's valid time range overlaps the given timerange, else `False`. """ if self.use_old_format: if hasattr(self, 'extractor'): exdict = parse(self.extractor, url).named tr = get_timerange_from_exdict(exdict) return tr.intersects(timerange) else: datehref = self._extract_date(url).to_datetime() timestep = extract_timestep(self.pattern) tr = TimeRange(datehref, datehref + timestep) return tr.intersects(timerange) else: exdict = parse(self.pattern, url).named if exdict['year'] < 100: exdict['year'] = 2000 + exdict['year'] if 'month' not in exdict: if 'month_name' in exdict: exdict['month'] = datetime.strptime(exdict['month_name'], '%B').month elif 'month_name_abbr' in exdict: exdict['month'] = datetime.strptime(exdict['month_name_abbr'], '%b').month tr = get_timerange_from_exdict(exdict) return tr.intersects(timerange) def _url_follows_pattern(self, url): """ Check whether the url provided follows the pattern. """ if self.use_old_format: pattern = self.pattern for k, v in TIME_CONVERSIONS.items(): pattern = pattern.replace(k, v) matches = re.match(pattern, url) if matches: return matches.end() == matches.endpos return False else: return parse(self.pattern, url) def _extract_date(self, url): """ Extracts the date from a particular url following the pattern. """ # Remove the user and password if present url = url.replace("anonymous:data@sunpy.org@", "") def url_to_list(txt): # Substitutes '.' and '_' for '/'. return re.sub(r'\.|_', '/', txt).split('/') # Create a list of all the blocks in times # assuming they are all separated with either '.', '_' or '/' pattern_list = url_to_list(self.pattern) url_list = url_to_list(url) time_order = ['%Y', '%y', '%b', '%B', '%m', '%d', '%j', '%H', '%I', '%M', '%S', '%e', '%f'] final_date = [] final_pattern = [] # Find in directory and filename for pattern_elem, url_elem in zip(pattern_list, url_list): time_formats = [x for x in time_order if x in pattern_elem] if len(time_formats) > 0: # Find whether there's text that should not be here toremove = re.split('%.', pattern_elem) if len(toremove) > 0: for bit in toremove: if bit != '': url_elem = url_elem.replace(bit, '', 1) pattern_elem = pattern_elem.replace(bit, '', 1) final_date.append(url_elem) final_pattern.append(pattern_elem) for time_bit in time_formats: time_order.remove(time_bit) # Find and remove repeated elements eg: %Y in ['%Y', '%Y%m%d'] # Make all as single strings date_together = ''.join(final_date) pattern_together = ''.join(final_pattern) re_together = pattern_together for k, v in TIME_CONVERSIONS.items(): re_together = re_together.replace(k, v) # Lists to contain the unique elements of the date and the pattern final_date = list() final_pattern = list() re_together = re_together.replace('[A-Z]', '\\[A-Z]') for p, r in zip(pattern_together.split('%')[1:], re_together.split('\\')[1:]): if p == 'e': continue regexp = fr'\{r}' if not r.startswith('[') else r pattern = f'%{p}' date_part = re.search(regexp, date_together) date_together = date_together[:date_part.start()] \ + date_together[date_part.end():] if pattern not in final_pattern: final_pattern.append(f'%{p}') final_date.append(date_part.group()) return Time.strptime(' '.join(final_date), ' '.join(final_pattern)) def _extract_files_meta(self, timerange, extractor=None, matcher=None): """ Returns metadata information contained in URLs. Parameters ---------- timerange : `~sunpy.time.TimeRange` Time interval where to find the directories for a given pattern. extractor : `str` Extractor to extract metadata from URLs if the old format for pattern is used. matcher : `dict` Dictionary to check if extracted metadata is valid. Returns ------- `list` of `dict` List of metadata info for all URLs. """ if self.use_old_format: self.extractor = extractor urls = self.filelist(timerange) metalist = [] for url in urls: metadict = parse(extractor, url) if metadict is not None: append = True metadict = metadict.named metadict['url'] = url if matcher is not None: for k in metadict: if k in matcher and str(metadict[k]) not in matcher[k]: append = False break if append: metalist.append(metadict) return metalist else: urls = self.filelist(timerange) metalist = [] for url in urls: metadict = parse(self.pattern, url) if metadict is not None: append = True metadict = metadict.named metadict['url'] = url if 'month' not in metadict: if 'month_name' in metadict: metadict['month'] = datetime.strptime(metadict['month_name'], '%B').month elif 'month_name_abbr' in metadict: metadict['month'] = datetime.strptime(metadict['month_name_abbr'], '%b').month if matcher is not None: for k in metadict: if k in matcher and str(metadict[k]) not in matcher[k]: append = False break if append: metalist.append(metadict) return metalist