Basic Usage¶
This notebook should demonstrate how I try to access and process data
more easily. As of now the esm_analysis
library is just a concept and
far from being ready to be deployed. As an example I will try to explain
in functionality based on CMORPH satellite data:
# Import the library
import esm_analysis
Creating a Task Schedule¶
To involve a computing cluster we’ll import the SLURMCluster
from
dask_jobqueue
from dask_jobqueue import SLURMCluster
The cluster cluster can be configured according to the desired queue and project
cluster = SLURMCluster(queue='gpu', project='mh0287', walltime='03:00:00')
/mnt/lustre01/work/mh0287/m300765/anaconda3/lib/python3.7/site-packages/distributed/dashboard/core.py:74: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("n" + msg)
cluster
VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='n<div>n <style scoped>n …
%matplotlib notebook
from itertools import groupby
import getpass
import os
from pathlib import Path
from multiprocessing import cpu_count
import re
def etime():
user, sys, _, _, _ = os.times()
return user+sys
Creating the RunDirectory object¶
The RunDirectory
class opens netcdf data and tries to fetch
additional information on the data that might be stored in fortran name
list files that might have been created by run-scripts. Creating an
instance of the RunDirecotry
object will create a json file
where all important data is stored for faster accessing the run the next
time.
To instantiate the RunDirectory
object we need the path to the data
an experiment name (or file name suffix) that is unique to the data
files and which kind of model/dataset is considered. Model/Dataset type
is useful as variable names will be conveniently translated by the
library; the user will only have to know the variable names for the
ECHAM model convection. Variable name translations to other
datasets/models will be done by the library. The following translations
are implemented at the moment: * ECHAM / ICON-MPI * ICON-DWD * CMORPH
Let’s open a CMORPH satellite based rainfall dataset.
# First define the path where the data is stored
run_dir = Path('/home/mpim/m300765/Data/CMORPH/netcdf/years/')
suffix = 'Cmorph' # Experiment name (in this case the filename suffix)
data_type = suffix.upper() # No tell the module that it should convert variable names from
# CMORPH to ECHAM names
# Instanciate the RunDirectory object
Run = esm_analysis.RunDirectory(run_dir, model_type=data_type, overwrite=True, client=cluster)
If no client keyword is given then RunDirectory
tries to start a
local multiprocessing pool. But in our case we have started a cluster
and can directly connect to that cluster
Run.dask_client
Client
|
Cluster
|
NOTE: The overwrite
key word argument set to true for
demonstrative purpose and will be explained in detail later.
At this time no files have been opened, let’s see which files that
belong to the dataset it has found. The files
property will return a
pandas Series
with all filename that can be potentially opened. We
know that the filenames follow a certain date pattern. Let’s group all
filenames for a certain months together:
Run.files.head()
0 /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
1 /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
2 /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
3 /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
4 /home/mpim/m300765/Data/CMORPH/netcdf/years/19...
dtype: object
Re-Gridding the Input Data¶
Let’s remap the content of this run. The run_directory
object also
offers a method for this. The remapping method is applied by calling
remap
.
Remap will automatically try to run jobs in parallel. First see check how many parallel tasks are available on this machine:
print(f'Will run with {len(config.workers)} workers a {config.worker_cores} cores in the background.')
Will run with 2 workers a 48 cores in the background.
Let’s define the output directory and the grid description file. If no output directory is given the remapping will create a sub-directory in the work folder.
# Define the traget grid description and the output directory of the folder
griddes = '/home/mpim/m300385/precip/T63grid_TRMM.txt'
user = getpass.getuser()
out_dir = Path(f'/scratch/{user[0]}/{user}/tmp/CMORPH/echam_grid')
out_dir.mkdir(exist_ok=True, parents=True)
# Call the remap procedure
%time _ = Run.remap(griddes, out_dir=out_dir, method='remapcon')
HBox(children=(IntProgress(value=0, description='Remapping: ', max=6574, style=ProgressStyle(description_width…
CPU times: user 6min 5s, sys: 11.3 s, total: 6min 16s
Wall time: 34min 1s
Applying Custom Functions to the Run¶
The remapped data is stored in daily files. Suppose we want to merge
the data into monthly files. We can take advantage of the fact that the
filenames follow a the convention PRODUCTNAME_YEAR_MONTH_DAY.nc
,
hence is it easy to use some regex
to group them by month.
search = lambda key : re.search('.*\d{4}_\d{2}_*', key, re.M).group(0)
groups = [list(g) for k, g in groupby(Run.files, key=search)]
print(f'There are {len(groups)} different months in the dataset')
There are 216 different months in the dataset
mpi_data
comes with python bindings for cdo which can be accessed
via mpi_data.cdo
. Let’s define a function that merges the daily data
in CMPORPH into monthly data:
def mergetime(infiles, outdir, prefix):
"""Apply a cdo mergetime."""
# Get the month and year from the first filename
re_match = f'.*{prefix}_(?P<year>[^/]+)_(?P<month>[^/]+)_(?P<day>[^/]+).nc$'
year, month = re.match(re_match, infiles[0]).group('year', 'month')
outfile = Path(outdir)/Path(f'{prefix}_{year}_{month}.nc')
return esm_analysis.cdo.mergetime(' '+' '.join(infiles), output=outfile.as_posix())
# Define the out directory
out_dir = Path(f'/scratch/{user[0]}/{user}/tmp/CMORPH/echam_mon')
out_dir.mkdir(exist_ok=True, parents=True)
We have 216 months to merge this operation can take quite some time
when done serially. The RunDirectory
object offers a static method
(apply_function
) that can apply a collection to any given function
in parallel. Let’s apply the above defined function mergetime
in
parallel and measure the speed of the application.
%time _ = Run.apply_function(mergetime, groups, args=(out_dir.as_posix(), 'Cmorph'), label='Merge Time')
HBox(children=(IntProgress(value=0, description='Merge Time: ', max=216, style=ProgressStyle(description_width…
CPU times: user 945 ms, sys: 68 ms, total: 1.01 s
Wall time: 5.7 s
Merging all months has finished in under just 10 seconds. Let’s load the new monthly dataset :
MonthRun = esm_analysis.RunDirectory(out_dir, model_type=data_type, overwrite=True, client=cluster)
MonthRun.files.iloc[-1]
'/scratch/m/m300765/tmp/CMORPH/echam_mon/Cmorph_2015_12.nc'
Up to now we did not load any dataset. Let’s load the entire data by
creating a virtual dataset that opens all files and virtually merges
them along the time axis. This can be done by calling load_data
method.
Loading only a subset can be specified by collections of filenames or glob patterns. If we wanted to load only data between 1999 and 2105 we could to the following:
t1 = etime()
MonthRun.load_data([f'*{year}*.nc' for year in range(1999, 2016)])
print(f'Fresh load took {etime() - t1} seonds')
Fresh load took 12.279999999999973 seonds
The loaded dataset can be accessed with help of the dataset
property
MonthRun.dataset[MonthRun.variables['pr']]
<xarray.DataArray 'precip' (time: 49672, lat: 32, lon: 192)>
dask.array<shape=(49672, 32, 192), dtype=float32, chunksize=(248, 32, 192)>
Coordinates:
* time (time) datetime64[ns] 1999-01-01 ... 2015-12-31T21:00:00
* lon (lon) float64 0.0 1.875 3.75 5.625 7.5 ... 352.5 354.4 356.2 358.1
* lat (lat) float64 -28.91 -27.05 -25.18 -23.32 ... 25.18 27.05 28.91
Attributes:
standard_name: 3_hourly_accumulated_precipitaion
long_name: 3 hourly accumulated precipitaion
units: mm/3hr
code: 142
table: 128
short_naem: precip
fill_value: -999.0
Using Data-Caches for faster realoading¶
Loading the data can take, depending on the amount of files in the run,
a significant up amount of time. This is where the overwrite
keyword
argument comes in. When creating an instance of the RunDirectory
object meta data about the run is saved in a json file. Loading a
dataset will dump a serialized image of that loaded dataset into the run
directory and the path to the dump is stored in the json file. The
next time the data is loaded this dump will be opened instead of the
netcdf files. This speeds up significantly the reading process. If the
user wishes not to stored meta data but created a ‘fresh’ read of the
data from files the RunDirectory
object can instantiated with the
overwrite
keyword argument set to True
.
Let’s do the same what we’ve done above but with the default behavior. No overwrite. You’ll notice a significant speed-up for retrieving the dataset.
MonthRun = esm_analysis.RunDirectory(out_dir, model_type=data_type, client=cluser)
t1 = etime()
MonthRun.load_data()
print(f'Loading the serialized pickle took {etime() - t1} seonds')
Loading the serialized pickle took 0.43000000000006366 seonds
Interactive Data Visualisation¶
mpi_data
comes with a very rudimentary plotting collection. For
instances plotting on maps is still not supported. Yet the 2D rainfall
field could be visualized by applying the profile_2d
method of the
ProfilePlotter
class
from mpi_data.plot import ProfilePlotter
from matplotlib import pyplot as plt
P = ProfilePlotter.profile_2d(MonthRun.dataset, MonthRun.variables['pr'],
figsize=(9, 6), data_dim='x', avg_dims=None,
vmin=0, vmax=20, apply_func=None ,
cbar_args={'cbar_size':'7%', 'cbar_pad': '5%'})
<IPython.core.display.Javascript object>
HBox(children=(BoundedFloatText(value=0.0, description='time', layout=Layout(height='30px', width='200px'), ma…
FloatRangeSlider(value=(0.0, 20.0), continuous_update=False, description='Range:', layout=Layout(width='100%')…
As of now, this creates a imshow
plot along with ipython widgets
that can be used to change color bar range, color map and cycle through
time steps in the plot. This makes a first exploration of the data very
easy.
Since the dataset of MonthRun is an xarray dataset object you can do anything with it you would usually do with xarray datasets.
fig, ax = plt.subplots(1,1, figsize=(9, 3.2))
MonthRun.dataset[MonthRun.variables['pr']][0:8].sum(dim='time').plot(ax=ax)
<IPython.core.display.Javascript object>
<matplotlib.collections.QuadMesh at 0x2b54730c38d0>