Basic Usage

This notebook should demonstrate how I try to access and process data more easily. As of now the esm_analysis library is just a concept and far from being ready to be deployed. As an example I will try to explain in functionality based on CMORPH satellite data:

# Import the library
import esm_analysis

Creating a Task Schedule

To involve a computing cluster we’ll import the SLURMCluster from dask_jobqueue

from dask_jobqueue import SLURMCluster

The cluster cluster can be configured according to the desired queue and project

cluster = SLURMCluster(queue='gpu', project='mh0287', walltime='03:00:00')
/mnt/lustre01/work/mh0287/m300765/anaconda3/lib/python3.7/site-packages/distributed/dashboard/core.py:74: UserWarning:
Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("n" + msg)
cluster
VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='n<div>n  <style scoped>n    …
%matplotlib notebook
from itertools import groupby
import getpass
import os
from pathlib import Path
from multiprocessing import cpu_count
import re

def etime():
    user, sys, _, _, _ = os.times()
    return user+sys

Creating the RunDirectory object

The RunDirectory class opens netcdf data and tries to fetch additional information on the data that might be stored in fortran name list files that might have been created by run-scripts. Creating an instance of the RunDirecotry object will create a json file where all important data is stored for faster accessing the run the next time.

To instantiate the RunDirectory object we need the path to the data an experiment name (or file name suffix) that is unique to the data files and which kind of model/dataset is considered. Model/Dataset type is useful as variable names will be conveniently translated by the library; the user will only have to know the variable names for the ECHAM model convection. Variable name translations to other datasets/models will be done by the library. The following translations are implemented at the moment: * ECHAM / ICON-MPI * ICON-DWD * CMORPH

Let’s open a CMORPH satellite based rainfall dataset.

# First define the path where the data is stored
run_dir = Path('/home/mpim/m300765/Data/CMORPH/netcdf/years/')

suffix = 'Cmorph' # Experiment name (in this case the filename suffix)
data_type = suffix.upper() # No tell the module that it should convert variable names from
                           # CMORPH to ECHAM names
# Instanciate the RunDirectory object
Run = esm_analysis.RunDirectory(run_dir, model_type=data_type, overwrite=True, client=cluster)

If no client keyword is given then RunDirectory tries to start a local multiprocessing pool. But in our case we have started a cluster and can directly connect to that cluster

Run.dask_client

Client

Cluster

  • Workers: 2
  • Cores: 96
  • Memory: 128.00 GB

NOTE: The overwrite key word argument set to true for demonstrative purpose and will be explained in detail later.

At this time no files have been opened, let’s see which files that belong to the dataset it has found. The files property will return a pandas Series with all filename that can be potentially opened. We know that the filenames follow a certain date pattern. Let’s group all filenames for a certain months together:

Run.files.head()
0    /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
1    /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
2    /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
3    /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
4    /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
dtype: object

Re-Gridding the Input Data

Let’s remap the content of this run. The run_directory object also offers a method for this. The remapping method is applied by calling remap.

Remap will automatically try to run jobs in parallel. First see check how many parallel tasks are available on this machine:

print(f'Will run with {len(config.workers)} workers a {config.worker_cores} cores in the background.')
Will run with 2 workers a 48 cores in the background.

Let’s define the output directory and the grid description file. If no output directory is given the remapping will create a sub-directory in the work folder.

# Define the traget grid description and the output directory of the folder
griddes = '/home/mpim/m300385/precip/T63grid_TRMM.txt'
user = getpass.getuser()
out_dir = Path(f'/scratch/{user[0]}/{user}/tmp/CMORPH/echam_grid')
out_dir.mkdir(exist_ok=True, parents=True)
# Call the remap procedure
%time _ =  Run.remap(griddes, out_dir=out_dir, method='remapcon')
HBox(children=(IntProgress(value=0, description='Remapping: ', max=6574, style=ProgressStyle(description_width…
CPU times: user 6min 5s, sys: 11.3 s, total: 6min 16s
Wall time: 34min 1s

Applying Custom Functions to the Run

The remapped data is stored in daily files. Suppose we want to merge the data into monthly files. We can take advantage of the fact that the filenames follow a the convention PRODUCTNAME_YEAR_MONTH_DAY.nc, hence is it easy to use some regex to group them by month.

search = lambda key : re.search('.*\d{4}_\d{2}_*', key, re.M).group(0)
groups = [list(g) for k, g in groupby(Run.files, key=search)]
print(f'There are {len(groups)} different months in the dataset')
There are 216 different months in the dataset

mpi_data comes with python bindings for cdo which can be accessed via mpi_data.cdo. Let’s define a function that merges the daily data in CMPORPH into monthly data:

def mergetime(infiles, outdir, prefix):
    """Apply a cdo mergetime."""
    # Get the month and year from the first filename
    re_match = f'.*{prefix}_(?P<year>[^/]+)_(?P<month>[^/]+)_(?P<day>[^/]+).nc$'
    year, month = re.match(re_match, infiles[0]).group('year', 'month')
    outfile = Path(outdir)/Path(f'{prefix}_{year}_{month}.nc')
    return esm_analysis.cdo.mergetime(' '+' '.join(infiles), output=outfile.as_posix())
# Define the out directory
out_dir = Path(f'/scratch/{user[0]}/{user}/tmp/CMORPH/echam_mon')
out_dir.mkdir(exist_ok=True, parents=True)

We have 216 months to merge this operation can take quite some time when done serially. The RunDirectory object offers a static method (apply_function) that can apply a collection to any given function in parallel. Let’s apply the above defined function mergetime in parallel and measure the speed of the application.

%time _ = Run.apply_function(mergetime, groups, args=(out_dir.as_posix(), 'Cmorph'), label='Merge Time')
HBox(children=(IntProgress(value=0, description='Merge Time: ', max=216, style=ProgressStyle(description_width…
CPU times: user 945 ms, sys: 68 ms, total: 1.01 s
Wall time: 5.7 s

Merging all months has finished in under just 10 seconds. Let’s load the new monthly dataset :

MonthRun = esm_analysis.RunDirectory(out_dir, model_type=data_type, overwrite=True, client=cluster)
MonthRun.files.iloc[-1]
'/scratch/m/m300765/tmp/CMORPH/echam_mon/Cmorph_2015_12.nc'

Up to now we did not load any dataset. Let’s load the entire data by creating a virtual dataset that opens all files and virtually merges them along the time axis. This can be done by calling load_data method.

Loading only a subset can be specified by collections of filenames or glob patterns. If we wanted to load only data between 1999 and 2105 we could to the following:

t1 = etime()
MonthRun.load_data([f'*{year}*.nc' for year in range(1999, 2016)])
print(f'Fresh load took {etime() - t1} seonds')
Fresh load took 12.279999999999973 seonds

The loaded dataset can be accessed with help of the dataset property

MonthRun.dataset[MonthRun.variables['pr']]
<xarray.DataArray 'precip' (time: 49672, lat: 32, lon: 192)>
dask.array<shape=(49672, 32, 192), dtype=float32, chunksize=(248, 32, 192)>
Coordinates:
  * time     (time) datetime64[ns] 1999-01-01 ... 2015-12-31T21:00:00
  * lon      (lon) float64 0.0 1.875 3.75 5.625 7.5 ... 352.5 354.4 356.2 358.1
  * lat      (lat) float64 -28.91 -27.05 -25.18 -23.32 ... 25.18 27.05 28.91
Attributes:
    standard_name:  3_hourly_accumulated_precipitaion
    long_name:      3 hourly accumulated precipitaion
    units:          mm/3hr
    code:           142
    table:          128
    short_naem:     precip
    fill_value:     -999.0

Using Data-Caches for faster realoading

Loading the data can take, depending on the amount of files in the run, a significant up amount of time. This is where the overwrite keyword argument comes in. When creating an instance of the RunDirectory object meta data about the run is saved in a json file. Loading a dataset will dump a serialized image of that loaded dataset into the run directory and the path to the dump is stored in the json file. The next time the data is loaded this dump will be opened instead of the netcdf files. This speeds up significantly the reading process. If the user wishes not to stored meta data but created a ‘fresh’ read of the data from files the RunDirectory object can instantiated with the overwrite keyword argument set to True.

Let’s do the same what we’ve done above but with the default behavior. No overwrite. You’ll notice a significant speed-up for retrieving the dataset.

MonthRun = esm_analysis.RunDirectory(out_dir, model_type=data_type, client=cluser)
t1 = etime()
MonthRun.load_data()
print(f'Loading the serialized pickle took {etime() - t1} seonds')
Loading the serialized pickle took 0.43000000000006366 seonds

Interactive Data Visualisation

mpi_data comes with a very rudimentary plotting collection. For instances plotting on maps is still not supported. Yet the 2D rainfall field could be visualized by applying the profile_2d method of the ProfilePlotter class

from mpi_data.plot import ProfilePlotter
from matplotlib import pyplot as plt
P = ProfilePlotter.profile_2d(MonthRun.dataset, MonthRun.variables['pr'],
                              figsize=(9, 6),  data_dim='x', avg_dims=None,
                              vmin=0, vmax=20, apply_func=None ,
                              cbar_args={'cbar_size':'7%', 'cbar_pad': '5%'})
<IPython.core.display.Javascript object>
HBox(children=(BoundedFloatText(value=0.0, description='time', layout=Layout(height='30px', width='200px'), ma…
FloatRangeSlider(value=(0.0, 20.0), continuous_update=False, description='Range:', layout=Layout(width='100%')…

As of now, this creates a imshow plot along with ipython widgets that can be used to change color bar range, color map and cycle through time steps in the plot. This makes a first exploration of the data very easy.

Since the dataset of MonthRun is an xarray dataset object you can do anything with it you would usually do with xarray datasets.

fig, ax = plt.subplots(1,1, figsize=(9, 3.2))
MonthRun.dataset[MonthRun.variables['pr']][0:8].sum(dim='time').plot(ax=ax)
<IPython.core.display.Javascript object>
<matplotlib.collections.QuadMesh at 0x2b54730c38d0>