Source code for herbie.fast

## Brian Blaylock
## May 3, 2021

"""
============
Herbie Tools
============
"""

import logging

# Multithreading :)
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import xarray as xr

from herbie.core import Herbie

log = logging.getLogger(__name__)


"""
🧵🤹🏻‍♂️ Notice! Multithreading and Multiprocessing is use

This is my first implementation of multithreading to create, download,
and read many Herbie objects. This drastically reduces the time it takes
to create a Herbie object (which is just looking for if and where a
GRIB2 file exists on the internet) and to download a file.
"""


def _validate_fxx(fxx):
    """Fast Herbie requires fxx as a list-like"""
    if isinstance(fxx, int):
        fxx = [fxx]

    if not isinstance(fxx, (list, range)):
        raise ValueError(f"fxx must be an int, list, or range. Gave {fxx}")

    return fxx


def _validate_DATES(DATES):
    """Fast Herbie requires DATES as a list-like"""
    if isinstance(DATES, str):
        DATES = [pd.to_datetime(DATES)]
    elif not hasattr(DATES, "__len__"):
        DATES = [pd.to_datetime(DATES)]

    if not isinstance(DATES, (list, pd.DatetimeIndex)):
        raise ValueError(
            f"DATES must be a pandas-parsable datetime string or a list. Gave {DATES}"
        )

    return DATES


def Herbie_latest(n=6, freq="1h", **kwargs):
    """Search for the most recent GRIB2 file (using multithreading).

    Parameters
    ----------
    n : int
        Number of attempts to try.
    freq : pandas-parsable timedelta string
        Time interval between each attempt.

    Examples
    --------
    When ``n=6``, and ``freq='1H'``, Herbie will look for the latest
    file within the last 6 hours (suitable for the HRRR model).

    When ``n=3``, and ``freq='6H'``, Herbie will look for the latest
    file within the last 18 hours (suitable for the GFS model).
    """
    current = pd.Timestamp.now("utc").tz_localize(None).floor(freq)
    DATES = pd.date_range(
        start=current - (pd.Timedelta(freq) * n),
        end=current,
        freq=freq,
    )
    FH = FastHerbie(DATES, **kwargs)
    return FH.file_exists[-1]


[docs] class FastHerbie:
[docs] def __init__(self, DATES, fxx=[0], *, max_threads=50, **kwargs): """Create many Herbie objects with methods to download or read with xarray. Uses multithreading. .. note:: Currently, Herbie objects looped by run datetime (date) and forecast lead time (fxx). Parameters ---------- DATES : pandas-parsable datetime string or list of datetimes fxx : int or list of forecast lead times max_threads : int Maximum number of threads to use. kwargs : Remaining keywords for Herbie object (e.g., model, product, priority, verbose, etc.) Benchmark --------- Creating 48 Herbie objects - 1 thread took 16 s - 2 threads took 8 s - 5 threads took 3.3 s - 10 threads took 1.7 s - 50 threads took 0.5 s """ self.DATES = _validate_DATES(DATES) self.fxx = _validate_fxx(fxx) kwargs.setdefault("verbose", False) ################ # Multithreading self.tasks = len(DATES) * len(fxx) threads = min(self.tasks, max_threads) log.info(f"🧵 Working on {self.tasks} tasks with {threads} threads.") self.objects = [] with ThreadPoolExecutor(threads) as exe: futures = [ exe.submit(Herbie, date=DATE, fxx=f, **kwargs) for DATE in DATES for f in fxx ] # Return list of Herbie objects in order completed for future in as_completed(futures): if future.exception() is None: self.objects.append(future.result()) else: log.error(f"Exception has occured : {future.exception()}") log.info(f"Number of Herbie objects: {len(self.objects)}") # Sort the list of Herbie objects by lead time then by date self.objects.sort(key=lambda H: H.fxx) self.objects.sort(key=lambda H: H.date) self.objects = self.objects # Which files exist? self.file_exists = [H for H in self.objects if H.grib is not None] self.file_not_exists = [H for H in self.objects if H.grib is None] if len(self.file_not_exists) > 0: log.warning( f"Could not find {len(self.file_not_exists)}/{len(self.file_exists)} GRIB files." )
def __len__(self): return len(self.objects)
[docs] def df(self): """Organize Herbie objects into a DataFrame. #? Why is this inefficient? Takes several seconds to display because the __str__ does a lot. """ ds_list = [ self.objects[x : x + len(self.fxx)] for x in range(0, len(self.objects), len(self.fxx)) ] return pd.DataFrame( ds_list, index=self.DATES, columns=[f"F{i:02d}" for i in self.fxx] )
[docs] def inventory(self, search=None): """Get combined inventory DataFrame. Useful for data discovery and checking your search before doing a download. """ # NOTE: In my quick test, you don't gain much speed using multithreading here. dfs = [] for i in self.file_exists: df = i.inventory(search) df = df.assign(FILE=i.grib) dfs.append(df) return pd.concat(dfs, ignore_index=True)
[docs] def download(self, search=None, *, max_threads=20, **download_kwargs): r"""Download many Herbie objects Uses multithreading. Parameters ---------- search : string Regular expression string to specify which GRIB messages to download. **download_kwargs : Any kwarg for Herbie's download method. Benchmark --------- Downloading 48 files with 1 variable (TMP:2 m) - 1 thread took 1 min 17 s - 2 threads took 36 s - 5 threads took 28 s - 10 threads took 25 s - 50 threads took 23 s """ ########################### # Multithread the downloads threads = min(self.tasks, max_threads) log.info(f"🧵 Working on {self.tasks} tasks with {threads} threads.") outFiles = [] with ThreadPoolExecutor(threads) as exe: futures = [ exe.submit(H.download, search, **download_kwargs) for H in self.file_exists ] # Return list of Herbie objects in order completed for future in as_completed(futures): if future.exception() is None: outFiles.append(future.result()) else: log.error(f"Exception has occured : {future.exception()}") return outFiles
[docs] def xarray( self, search, *, max_threads=None, **xarray_kwargs, ): """Read many Herbie objects into an xarray Dataset. # TODO: Sometimes the Jupyter Cell always crashes when I run this. # TODO: "fatal flex scanner internal error--end of buffer missed" Uses multithreading (or multiprocessing). This would likely benefit from multiprocessing instead. Parameters ---------- max_threads : int Control the maximum number of threads to use. If you use too many threads, you may run into memory limits. Benchmark --------- Opening 48 files with 1 variable (TMP:2 m) - 1 thread took 1 min 45 s - 2 threads took 55 s - 5 threads took 39 s - 10 threads took 39 s - 50 threads took 37 s """ xarray_kwargs = dict(search=search, **xarray_kwargs) # NOTE: Multiprocessing does not seem to work because it looks # NOTE: like xarray objects are not pickleable. # NOTE: ``Reason: 'TypeError("cannot pickle '_thread.lock' object"`` if max_threads: ########################### # Multithread the downloads # ! Only works sometimes # ! I get this error: "'EntryPoint' object has no attribute '_key'"" threads = min(self.tasks, max_threads) log.info(f"🧵 Working on {self.tasks} tasks with {threads} threads.") with ThreadPoolExecutor(max_threads) as exe: futures = [ exe.submit(H.xarray, **xarray_kwargs) for H in self.file_exists ] # Return list of Herbie objects in order completed ds_list = [future.result() for future in as_completed(futures)] else: ds_list = [H.xarray(**xarray_kwargs) for H in self.file_exists] # Sort the DataSets, first by lead time (step), then by run time (time) ds_list.sort(key=lambda x: x.step.data.max()) ds_list.sort(key=lambda x: x.time.data.max()) # Reshape list with dimensions (len(DATES), len(fxx)) ds_list = [ ds_list[x : x + len(self.fxx)] for x in range(0, len(ds_list), len(self.fxx)) ] # Concat DataSets try: ds = xr.combine_nested( ds_list, concat_dim=["time", "step"], combine_attrs="drop_conflicts", ) except: # TODO: I'm not sure why some cases doesn't like the combine_attrs argument ds = xr.combine_nested( ds_list, concat_dim=["time", "step"], ) ds = ds.squeeze() return ds