Processing big model output data

This notebook serves as a stack of examples of how to make you of various python based data processing libraries to process large data from high resolution model output.

In this notebook we are going to process data from the Dyamond Winter project as an example. The data is available on mistral hence the notebook should be applied in a mistral computing environment to access the data.

Installing Libraries:

To be able to import all libraries that a necessary to run this notebook install the esm_analysis repository:

python3 -m pip install git+https://github.com/antarcticrainforest/esm_analysis.git

Import Libraries:

[1]:
from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
from subprocess import run, PIPE

from cartopy import crs as ccrs # Cartography library
import dask # Distributed data libary
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from distributed import Client, progress, wait # Libaray to orchestrate distributed resources
from hurry.filesize import size as filesize # Get human readable file sizes
from matplotlib import pyplot as plt # Standard Plotting library
from metpy import calc as metcalc # Calculate atmospheric variables
from metpy.units import units as metunits # Easy to use meteorological units
import numpy as np # Standard array library
import pandas as pd # Libary to work with labeled data frames and time series
import seaborn as sns # Makes plots more beautiful
import xarray as xr # Libary to work with labeled n-dimensional data and dask

2.0 Setup a distributed computing cluster where we can process the data:

The data we are going to process is in the order of TB. On a single machine using a single core this can not only be slow but also the fields we are trying to process won’t fit into a single computers memory. Therefore we’re setting up a distributed cluster using the dask_jobqueue library. Specifically we will involve the Slurm workload manager to to that. More information on the dask_jobqueue library can be found here: https://jobqueue.dask.org/en/latest/ .

To create the slum cluster we need some information, like the account that is going to be charged and the partition that is going to be used. In this example we are going to use the GPU partition but any other partition can be involved:

[2]:
# Set some user specific variables
account_name = 'mh0731' # Account that is going to be 'charged' fore the computation
partition = 'prepost' # Name of the partition we want to use
job_name = 'dyamondProc' # Job name that is submitted via sbatch
memory = "200GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 42 # Max number of cores per that are reserved - also partition dependend
walltime = '12:00:00' # Walltime - also partition dependen
[3]:
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_scratch_dir = TemporaryDirectory(dir=scratch_dir, prefix='DyamondProc')
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       project=account_name,
                       walltime=walltime,
                       queue=partition,
                       local_directory=dask_scratch_dir.name,
                       job_extra=[f'-J {job_name}',
                                  f'-D {dask_scratch_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

So far nothing has happened, lets order 10 nodes which will give us 420 cores and 2 TB distributed memory to work on:

[4]:
cluster.scale(10)
cluster

Now we have submitted two jobs that establish the distributed computing resources. After some queuing time, depending on how busy the computer is. The resources will be available. This can be seen when the above interfaces changes from

Workers 0/10

to

Workers 10

We can also check the status by calling the squeue command from bash:

[5]:
! squeue -u $USER
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          21778712   prepost dyamondP  m300765  R       2:29      1 m11543
          21778713   prepost dyamondP  m300765  R       2:29      1 m11555
          21778714   prepost dyamondP  m300765  R       2:29      1 m11556
          21778715   prepost dyamondP  m300765  R       2:29      1 m11557
          21778716   prepost dyamondP  m300765  R       2:29      1 m11558
          21778717   prepost dyamondP  m300765  R       2:29      1 m11559
          21778718   prepost dyamondP  m300765  R       2:29      1 m11513
          21778719   prepost dyamondP  m300765  R       2:29      1 m11514
          21778711   prepost dyamondP  m300765  R       2:57      1 m11549
          21778710   prepost dyamondP  m300765  R       2:59      1 m11548

Now that the computing resources are made available we have to connect a client to it. This client servers as an instance between the commands we are going to use and the cluster. Let’s create the client. This can be done by calling the Client instance with the cluster we have just created. This will tell dask the distributed library do to all calculations on the cluster.

[6]:
dask_client = Client(cluster)

If not workers are available yet not computation will be done. Once the workers become available computation can be executed on the available worker only. So in general it is a good idea to wait until as much as possible workers are available and yet more computing resources is can be utilized.

[7]:
# Blocking call to wait for at least 10 workers before continuing;
# This might take a while so grab a coffee or tea
dask_client.wait_for_workers(10)
dask_client
[7]:

Client

Cluster

  • Workers: 10
  • Cores: 420
  • Memory: 2.15 TB

3.0 Read 2D input data

Let’s define the paths and the variables we are going to read. In this example we want to compare two datasets that is:

  • dpp0015 : Coupled dyamond run with standard parameter configuration

  • dpp0017 : Coupled dyamond run with increased ocean albedo and increased inversion parameter Cₙ

In this example we will make use of six hourly data which is store in <exp>_2d_atm_ml_<timestep>.nc file patterns.

[8]:
# Define paths here
paths = {'dpp0015' : Path('/work/mh0287/k203123/GIT/icon-aes-dyw/experiments') / 'dpp0015',
         'dpp0018' : Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments') / 'dpp0018'}
glob_pattern_2d = 'atm_2d_ml'
dpp_runs = list(paths.keys())

The runs can be read with the open_mfdataset from xarray. This methods takes quite a number of arguments. We will try opening all the <exp>_2d_atm_ml_<timestep>.nc files and merging them into one big dataset. It should be noted thet open_mfdataset doesn’t read any data until told. What it does, is just fetching meta data and creating a view to the data.

[9]:
datasets = {}
for exp in dpp_runs:
    print(f'Reading data from {exp}', end='\r')
    datasets[exp] = xr.open_mfdataset(
        str(paths[exp] / f'{exp}*{glob_pattern_2d}*.nc'),
        combine='by_coords',
        parallel=True,
        chunks={'time': 1}) # The chunking will get important when we read 3d data.
print(f'Done {50*" "}')
Done
[10]:
# dpp0017 has only data until 2st of February so we take a subset of both datasets:
for exp in dpp_runs:
    datasets[exp] = datasets[exp].sel({'time': slice('2020-01-21T00:00:00', '2020-02-01T18:00:00')})
[11]:
datasets['dpp0015'].time
[11]:
<xarray.DataArray 'time' (time: 48)>
array(['2020-01-21T00:00:00.000000000', '2020-01-21T06:00:00.000000000',
       '2020-01-21T12:00:00.000000000', '2020-01-21T18:00:00.000000000',
       '2020-01-22T00:00:00.000000000', '2020-01-22T06:00:00.000000000',
       '2020-01-22T12:00:00.000000000', '2020-01-22T18:00:00.000000000',
       '2020-01-23T00:00:00.000000000', '2020-01-23T06:00:00.000000000',
       '2020-01-23T12:00:00.000000000', '2020-01-23T18:00:00.000000000',
       '2020-01-24T00:00:00.000000000', '2020-01-24T06:00:00.000000000',
       '2020-01-24T12:00:00.000000000', '2020-01-24T18:00:00.000000000',
       '2020-01-25T00:00:00.000000000', '2020-01-25T06:00:00.000000000',
       '2020-01-25T12:00:00.000000000', '2020-01-25T18:00:00.000000000',
       '2020-01-26T00:00:00.000000000', '2020-01-26T06:00:00.000000000',
       '2020-01-26T12:00:00.000000000', '2020-01-26T18:00:00.000000000',
       '2020-01-27T00:00:00.000000000', '2020-01-27T06:00:00.000000000',
       '2020-01-27T12:00:00.000000000', '2020-01-27T18:00:00.000000000',
       '2020-01-28T00:00:00.000000000', '2020-01-28T06:00:00.000000000',
       '2020-01-28T12:00:00.000000000', '2020-01-28T18:00:00.000000000',
       '2020-01-29T00:00:00.000000000', '2020-01-29T06:00:00.000000000',
       '2020-01-29T12:00:00.000000000', '2020-01-29T18:00:00.000000000',
       '2020-01-30T00:00:00.000000000', '2020-01-30T06:00:00.000000000',
       '2020-01-30T12:00:00.000000000', '2020-01-30T18:00:00.000000000',
       '2020-01-31T00:00:00.000000000', '2020-01-31T06:00:00.000000000',
       '2020-01-31T12:00:00.000000000', '2020-01-31T18:00:00.000000000',
       '2020-02-01T00:00:00.000000000', '2020-02-01T06:00:00.000000000',
       '2020-02-01T12:00:00.000000000', '2020-02-01T18:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
Attributes:
    standard_name:  time
    axis:           T
[12]:
datasets['dpp0018'].time # This data is daily data only
[12]:
<xarray.DataArray 'time' (time: 12)>
array(['2020-01-21T00:00:00.000000000', '2020-01-22T00:00:00.000000000',
       '2020-01-23T00:00:00.000000000', '2020-01-24T00:00:00.000000000',
       '2020-01-25T00:00:00.000000000', '2020-01-26T00:00:00.000000000',
       '2020-01-27T00:00:00.000000000', '2020-01-28T00:00:00.000000000',
       '2020-01-29T00:00:00.000000000', '2020-01-30T00:00:00.000000000',
       '2020-01-31T00:00:00.000000000', '2020-02-01T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
Attributes:
    standard_name:  time
    axis:           T
[13]:
# Merge both datasets in one dataet together to make it easier to work with them:
datasets = xr.concat(list(datasets.values()), dim='exp').assign_coords({'exp': list(datasets.keys())})
datasets
[13]:
<xarray.Dataset>
Dimensions:  (exp: 2, ncells: 20971520, time: 48)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Data variables:
    ps       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    psl      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdt     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsut     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsutcs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlut     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlutcs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsds     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlds     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rldscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsus     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsuscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlus     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ts       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sic      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sit      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clt      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prlr     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prls     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    pr       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prw      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    cllvi    (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clivi    (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qgvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qrvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qsvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfls     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfss     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    evspsbl  (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tauu     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tauv     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sfcwind  (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    uas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    vas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
Attributes:
    CDI:                  Climate Data Interface version 1.8.3rc (http://mpim...
    Conventions:          CF-1.6
    number_of_grid_used:  15
    grid_file_uri:        http://icon-downloads.mpimet.mpg.de/grids/public/mp...
    uuidOfHGrid:          0f1e7d66-637e-11e8-913b-51232bb4d8f9
    title:                ICON simulation
    institution:          Max Planck Institute for Meteorology/Deutscher Wett...
    source:               git@gitlab.dkrz.de:icon/icon-aes.git@6b5726d38970a4...
    history:              /work/mh0287/k203123/GIT/icon-aes-dyw/bin/icon at 2...
    references:           see MPIM/DWD publications
    comment:              Sapphire Dyamond (k203123) on m11338 (Linux 2.6.32-...
[14]:
global_attrs = datasets.attrs
var_attrs = {varn: datasets[varn].attrs for varn in datasets.data_vars}
[15]:
def set_attrs(dataset):
    '''Set attributes to a dataset that might have lost its attributes.'''
    dataset.attrs = global_attrs
    first_attr = list(var_attrs.values())[0]
    for varn in dataset.data_vars:
        try:
            dataset[varn].attrs = var_attrs[varn]
        except KeyError:
            dataset[varn].attrs = first_attr
    return dataset

At this stage no data - except for meta data - has been read. Since meta data is present we can inspect the dataset a little further:

[16]:
datasets.sel({'exp': 'dpp0015'})['ts']
[16]:
<xarray.DataArray 'ts' (time: 48, ncells: 20971520)>
dask.array<getitem, shape=(48, 20971520), dtype=float32, chunksize=(1, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
    exp      <U7 'dpp0015'
Dimensions without coordinates: ncells
Attributes:
    standard_name:                surface_temperature
    long_name:                    surface temperature
    units:                        K
    param:                        0.0.0
    CDI_grid_type:                unstructured
    number_of_grid_in_reference:  1
[17]:
datasets.sel({'exp': 'dpp0015'})['ts'].data
[17]:
Array Chunk
Bytes 4.03 GB 83.89 MB
Shape (48, 20971520) (1, 20971520)
Count 842 Tasks 48 Chunks
Type float32 numpy.ndarray
20971520 48

The data attribute returns either data array or if the data hasn’t been written yet a representation of what the will look like. It is also called a future, a very important concept for distributed computing. In our case the representation of the data is a dask array. Dask is a library that can split up the data into chunks and evenly spreads the data chunks across different cpu’s and computers. In our example we can see that the surface temperature dataset is split up into 44 chunks. Reading the array would take 935 tasks the total dataset would take 3.69 GB of memory. We can also ask xarray how much memory the whole dataset would consume:

[18]:
filesize(datasets.nbytes) # Use the filesize module to make the output more readable
[18]:
'270G'

This means that the total dataset (both experiments) would need 270 GB of memory. Way to much for a local computer but we do have 1.5 TB of distributed memory. Although this data would comfortably fit there we can reduce its size a little further by throwing out variables we don’t need and creating daily averages (the data is 6 hourly).

[19]:
# Reduce the data a little further, get only interesting variables
data_vars = ['clivi', 'cllvi', 'clt', 'hfls', 'hfss', 'pr', 'prw', 'ps', 'qgvi', 'qrvi',
             'qsvi', 'rlds', 'rlus', 'rlut', 'rsds', 'rsdscs', 'rsdt', 'rsus', 'rsuscs',
             'rsut', 'rsutcs', 'tas', 'ts', 'uas', 'vas']
datasets = datasets[data_vars]
# Create daily average
datasets = datasets.resample({'time': '1D'}).mean()
# We lost attributes set the saved once now
datasets.attrs = global_attrs
for varn in datasets.data_vars:
    datasets[varn].attrs = var_attrs[varn]
datasets
[19]:
<xarray.Dataset>
Dimensions:  (exp: 2, ncells: 20971520, time: 12)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Data variables:
    clivi    (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    cllvi    (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clt      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfls     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfss     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    pr       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prw      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ps       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qgvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qrvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qsvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlds     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlus     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlut     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsds     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdscs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdt     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsus     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsuscs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsut     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsutcs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ts       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    uas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    vas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
Attributes:
    CDI:                  Climate Data Interface version 1.8.3rc (http://mpim...
    Conventions:          CF-1.6
    number_of_grid_used:  15
    grid_file_uri:        http://icon-downloads.mpimet.mpg.de/grids/public/mp...
    uuidOfHGrid:          0f1e7d66-637e-11e8-913b-51232bb4d8f9
    title:                ICON simulation
    institution:          Max Planck Institute for Meteorology/Deutscher Wett...
    source:               git@gitlab.dkrz.de:icon/icon-aes.git@6b5726d38970a4...
    history:              /work/mh0287/k203123/GIT/icon-aes-dyw/bin/icon at 2...
    references:           see MPIM/DWD publications
    comment:              Sapphire Dyamond (k203123) on m11338 (Linux 2.6.32-...
[20]:
datasets['tas']
[20]:
<xarray.DataArray 'tas' (time: 12, exp: 2, ncells: 20971520)>
dask.array<concatenate, shape=(12, 2, 20971520), dtype=float32, chunksize=(1, 1, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Attributes:
    standard_name:                tas
    long_name:                    temperature in 2m
    units:                        K
    param:                        0.0.0
    CDI_grid_type:                unstructured
    number_of_grid_in_reference:  1
[21]:
filesize(datasets.nbytes)
[21]:
'46G'

This means we have significantly reduced the size that will by held in memory. But beware, this is the size that will be occupied in memory once the data has been read. The reading, chunking and averaging process will take much more memory. Although dask tries to optimize the memory consumption as much as possible.

So far no data has been read. We can trigger reading the data by using the persist method. Persist will start pushing the data to the distributed memory. There the netcdf-files will be read the the data will be copied into memory and the averaging will be done. If no distributed memory is available persist uses the local memory. To use the local memory in the first place one can use the compute method. Please also refer to https://distributed.readthedocs.io/en/latest/manage-computation.html#dask-collections-to-futures for more information.

But before we trigger computations on the cluster, we can create some additional datasets. That is net top of the atmosphere radiation budget and net surface energy:

[22]:
datasets['net_sw_toa'] = datasets['rsdt'] - datasets['rsut']
datasets['net_lw_toa'] = -datasets['rlut']
datasets['net_sw_surf'] = datasets['rsds'] - datasets['rsus']
datasets['net_lw_surf'] = datasets['rlds'] - datasets['rlus']
datasets['net_toa'] = datasets['net_sw_toa'] + datasets['net_lw_toa']
datasets['net_surf'] = datasets['net_sw_surf'] + datasets['net_lw_surf']
datasets['net_surf_energy'] = datasets['net_surf'] + datasets['hfss'] + datasets['hfls']
[23]:
# Push the data to the cluster and trigger all computations there
datasets = datasets.persist()

This will trigger the computation in the background, we don’t have to wait but can keep on analyzing the data. The progress class from dask provides the opportunity to get a progress bar that runs in the background to inform the user about the status.

[24]:
# Let's inspect the progress
progress(datasets, notebook=True)

We can see the tasks that are worked on in the background on the cluster. We can continue working with the data. The paradigm is that all calculations are collected and not executed until we explicitly instruct xarray / dask to trigger computations.

3.1 Sub setting the data

Suppose we wanted to work on say tropical areas only we would have to find a way of accessing the right grid triangles and omitting those once outside the tropics. Sub setting the data would be easy if the would be on a lat-lon grid. Then could simply do something like this:

data_tropics = datasets.sel({'lat': slice(-30, 30)})

Since we are working with unstructured grids things are a little more complex but can be done. Essentially we have two options:

  • Creating an index and getting the data via the index.

  • Masking the data that we don’t need.

Both options involve more information on the grid. Let’s read a land-see mask file:

[25]:
grid_path = Path('/work/mh0731/m300765/graupel_tuning')
lsm_file = grid_path / 'r02b09_land_frac.nc'
lsm = xr.open_dataset(lsm_file, chunks={'cell': -1}).load() # Load the file into local memory first
lons, lats = np.rad2deg(lsm.clon), np.rad2deg(lsm.clat) #Read lon, lat vector

The .load method loads the data into local memory. We can see the difference of the data attribute once the data is in the local memory instead of distributed memory or not loaded at all yet:

[26]:
lsm['sea'].data
[26]:
array([-0., -0., -0., ...,  1.,  1.,  1.])
[27]:
# Create sea and land masks and push the data to the distributed memory
s_mask = np.full(lsm['sea'].shape, np.nan) # Creat nan arrays
l_mask = np.full(lsm['sea'].shape, np.nan)
l_mask[lsm['land'].data == 1] = 1
s_mask[lsm['sea'].data == 1] = 1
s_mask
[27]:
array([nan, nan, nan, ...,  1.,  1.,  1.])
[28]:
# Create a dask array from the sea/land_mask and push it to the cluster
s_mask = dask.persist(dask.array.from_array(s_mask, chunks=-1))[0] # Push the data to the cluster
l_mask = dask.persist(dask.array.from_array(l_mask, chunks=-1))[0]
[29]:
s_mask
[29]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1
[30]:
l_mask
[30]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1

The most efficient way for unstructured grids is applying masks.The idea here is to mask everything outside the tropics. Let’s try indexing the data for tropical values and creating an index using np.where

[31]:
mask_tr = np.full(lats.shape, np.nan) # Create a mask and set the indices where lat is in [-30, 30] = 1
idx = np.where((lats >= -30) & (lats <= 30))[0]
mask_tr[idx] = 1
mask_tr = dask.persist(dask.array.from_array(mask_tr, chunks=-1))[0]
[32]:
mask_tr
[32]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1
[33]:
datasets = (datasets * mask_tr).persist()
[34]:
progress(datasets, notebook=True)

3.2 Plotting some maps

Lets to some plotting, first we plot maps, hence we create an average along the time axis:

[35]:
timmean = set_attrs(datasets.mean(dim='time').persist()) # Create mean and trigger computation
progress(timmean, notebook=True)

This computation is quite fast because we have already loaded the data into distributed memory. Alltogether this only took a little more than 1 minute. But the problem is that we cannot simply plot the data. To do so we are going to remap the data with cdo. For the remapping we need a grid file and a target grid description:

[36]:
# Define the grid describtion
griddes = '''#
# gridID 1
#
gridtype  = lonlat
gridsize  = 6480000
xsize     = 3600
ysize     = 1800
xname     = lon
xlongname = "longitude"
xunits    = "degrees_east"
yname     = lat
ylongname = "latitude"
yunits    = "degrees_north"
xfirst    = -179.95
xinc      = 0.1
yfirst    = -89.95
yinc      = 0.1
'''
# Write the grid description to a temporary file
griddes_file = NamedTemporaryFile(dir=scratch_dir, prefix='griddes_', suffix='.txt').name
with Path(griddes_file).open('w') as f: f.write(griddes)
[37]:
#Define the path to the grid-file
grid_file = grid_path / 'icon_grid_0015_R02B09_G.nc'

The best way to remap unstructured data on the resolution is a weighted remap. We do not have a weight file let’s create one first. For this we define a function that will call the cdo gendis command. To execute the function on the cluster we use the dask.delayed decorator, to tell the code it should be executed remotely.

[38]:
@dask.delayed
def gen_dis(dataset, grid_file, griddes, scratch_dir, global_attrs={}):
    '''Create a distance weights using cdo.'''
    if isinstance(dataset, xr.DataArray):
        # If a dataArray is given create a dataset
        dataset = xr.Dataset(data_vars={dataset.name: dataset})
        dataset.attrs = global_attrs
    enc, dataset = get_enc(dataset)
    weightfile = NamedTemporaryFile(dir=scratch_dir, prefix='weight_file', suffix='.nc').name
    with NamedTemporaryFile(dir=scratch_dir, prefix='input_', suffix='.nc') as tf:
        dataset.to_netcdf(tf.name,  encoding=enc, mode='w')
        cmd = ('cdo', '-O', f'gendis,{griddes}', f'-setgrid,{grid_file}', tf.name, weightfile)
        run_cmd(cmd)
        return weightfile

def run_cmd(cmd):
    '''Run a bash command.'''
    status = run(cmd, check=False, stderr=PIPE, stdout=PIPE)
    if status.returncode != 0:
        error = f'''{' '.join(cmd)}: {status.stderr.decode('utf-8')}'''
        raise RuntimeError(f'{error}')
    return status.stdout.decode('utf-8')

def get_enc(dataset, missing_val=-99.99e36):
    """Get encoding to save datasets."""
    enc = {}
    for varn in dataset.data_vars:
        enc[varn] = {'_FillValue': missing_val}
        dataset[varn].attrs = {**dataset[varn].attrs, **{'missing_value': missing_val}}
    return enc, dataset
[39]:
weightfile = gen_dis(timmean['tas'].isel({'exp':0}),
                     grid_file,
                     griddes_file,
                     scratch_dir,
                     global_attrs).compute()

We will remap the data on the the distributed cluster. For this we have to define a function that will remap the data is is executed on the cluster:

[40]:
def remap(dataset, grid_file, griddes, weightfile, tmpdir, attrs={}):
    """Perform a weighted remapping.

    Parameters
    ==========

    dataset : xarray.dataset
        The dataset the will be regriddes
    grid_file : Path, str
        Path to the grid file
    griddes : Path, str
        Path to the grid describtion file
    tmpdir : Path, str
        Directory for where temporary fils are stored

    Returns
    =======
    xarray.dataset : Remapped dataset
    """
    if isinstance(dataset, xr.DataArray):
        # If a dataArray is given create a dataset
        dataset = xr.Dataset(data_vars={dataset.name: dataset})
        dataset.attrs = attrs
    enc, dataset = get_enc(dataset)
    try:
        # cdo doesn't like strings as data; temporarly assign the exp coordinates to numbers (if exp present)
        exps = dataset.coords['exp'].values
        dataset = dataset.assign_coords({'exp': np.arange(len(exps))})
    except KeyError:
        exps = []
    with TemporaryDirectory(dir=tmpdir, prefix='cdo_') as indir:
            infile = Path(indir) / 'infile.nc'
            outfile = Path(indir) / 'outfile.nc'
            dataset.to_netcdf(infile, encoding=enc, mode='w')
            # Create the command to run
            cmd = ('cdo', '-O', f'remap,{griddes},{weightfile}',
                   f'-setgrid,{grid_file}',
                   str(infile), str(outfile))
            # Get the return value of the command
            run_cmd(cmd)
            try:
                # If there was a exp dimension put its values back into place
                return xr.open_dataset(outfile).load().assign_coords({'exp': exps})
            except KeyError:
                return xr.open_dataset(outfile).load()
[41]:
# Submit the remap function call to the cluster, lets do every variable in parallel
remap_futures = []
for var_name in datasets.data_vars:
    remap_futures.append(dask_client.submit(remap,
                                            timmean[var_name],
                                            grid_file,
                                            griddes_file,
                                            weightfile,
                                            scratch_dir,
                                            attrs=timmean.attrs))
progress(remap_futures, notebook=True)
[42]:
# Merge the results from the parllel computing back together into one dataset
dset_remap = xr.merge(dask_client.gather(remap_futures))
dset_remap
[42]:
<xarray.Dataset>
Dimensions:          (exp: 2, lat: 1800, lon: 3600)
Coordinates:
  * lon              (lon) float64 -179.9 -179.8 -179.8 ... 179.8 179.9 180.0
  * lat              (lat) float64 -89.95 -89.85 -89.75 ... 89.75 89.85 89.95
  * exp              (exp) <U7 'dpp0015' 'dpp0018'
Data variables:
    clivi            (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    cllvi            (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    clt              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    hfls             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    hfss             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    pr               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    prw              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    ps               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qgvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qrvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qsvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlds             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlus             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlut             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsds             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsdscs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsdt             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsus             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsuscs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsut             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsutcs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    tas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    ts               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    uas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    vas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_sw_toa       (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_lw_toa       (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_sw_surf      (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_lw_surf      (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_toa          (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_surf         (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_surf_energy  (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan

xarray has a powerful plot functionality that can be used to quickly plot and inspect the data:

[43]:
%matplotlib notebook
# Use the notebook plotting backend from matplotlib
(-dset_remap['rlut']).sel({'exp': 'dpp0015'}).plot.imshow(cmap='RdYlBu_r')
[43]:
<matplotlib.image.AxesImage at 0x2b5afb8510d0>

These plots are not very nice but we can make them much nicer by using cartopy to draw maps. Xarray supports subplots and fiddeling with the colorbar. Let’s create a nicer plot of net surface energy.

[44]:
plot_data = dset_remap['net_surf_energy'].sel({'lat': slice(-30, 30)})
# Create a diff (exp1 - exp2)
plot_exps = list(plot_data.coords['exp'].values)
diff_data = plot_data.diff(dim='exp').assign_coords({'exp': [f'{plot_exps[1]} - {plot_exps[0]}']})

#Let's rename the dimension in diff_data
diff_data = diff_data.rename({'exp': 'diff'})
[45]:
proj = ccrs.PlateCarree(central_longitude=50.) # Create ylindrical projections
plot = plot_data.plot(transform=ccrs.PlateCarree(),
                      row='exp',
                      figsize=(9,5),
                      cmap='RdYlBu_r',
                      vmin=-170,
                      vmax=170,
                      subplot_kws={'projection': proj},
                      cbar_kwargs={'label': 'Net Surface Energy [W/m$^2$]',
                                   'extend': 'both',
                                   'anchor': (0.5, -0.15),
                                   'fraction': 0.8,
                                   'aspect': 60,
                                   'orientation': 'horizontal'},

                     )
_ = [ax.coastlines() for ax in plot.axes.flat]
plot.fig.subplots_adjust(left=0.01, right=0.99, hspace=0, wspace=0, top=1, bottom=0.2)
plot
[45]:
<xarray.plot.facetgrid.FacetGrid at 0x2b5afee79890>

We could also plot the difference in a similar fashion:

[51]:
proj = ccrs.PlateCarree(central_longitude=50.) # Create ylindrical projections
fig = plt.figure(figsize=(9,4.5))
ax = fig.add_subplot(111, projection=proj)
plot = diff_data.plot(ax=ax,
                 transform=ccrs.PlateCarree(), # This is important
                 cmap='RdBu_r',
                 vmin=-80,
                 vmax=80,
                 cbar_kwargs={'label': 'Net Surface Energy [W/m$^2$]',
                              'extend': 'both',
                              'aspect': 60,
                              'orientation': 'horizontal'},

                     )

You probably have noticed the %matplotlib notebook statements at the beginning of each cell where we plot. This is a so called cell magic, specifically it tells the jupyter that we are going to plot with a certain graphical user interface backend. Here we use an interface that allows us interactively change the plot. You can use the controls at the bottom to zoom and pad. You can also change the plot itself:

[52]:
# Draw a thin high resolution coastline to the plot above
_ = ax.coastlines(resolution='10m', lw=0.5)
fig.subplots_adjust(left=0.01, right=0.99, hspace=0, wspace=0, top=1, bottom=0.3)

3.2 Plotting some time serie

Say we wanted to plot the time series for tropical ocean values only. This is easy to achieve we only need to multiply the dataset object with the sea mask and calculate a mean over the ncells dimension:

[53]:
datasets['net_surf_energy']
[53]:
<xarray.DataArray 'net_surf_energy' (time: 12, exp: 2, ncells: 20971520)>
dask.array<mul, shape=(12, 2, 20971520), dtype=float64, chunksize=(1, 1, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
[54]:
# Apply the sea mask, and create averge
fldmean = (datasets * s_mask).mean(dim='ncells')

Again nothing has been calculated yet, let’s trigger the computation in the background on the cluster:

[55]:
fldmean = fldmean.persist()
progress(fldmean, notebook=True)

The advantage is that the data is already loaded in the distributed memory. So any computations are fast. We can immediately plot the data:

[56]:
%matplotlib notebook
_ = fldmean['net_surf_energy'].plot.line(x='time')

Let’s make this plot a little more pretty. The Seaborn library provides a nice interface for that:

[57]:
# Tell seaborn to style the plots:
col_blind = sns.color_palette("colorblind", 10)
sns.set_style('ticks')
sns.set_palette(col_blind)
sns.set_context("notebook", font_scale=1., rc={"lines.linewidth": 2.5})

Let’s create a subplot with a time series and zonal averages of tropical ocean areas only. To do this we need a land-sea mask on a lat-lon grid which we have to create first.

[58]:
# Create an xarray dataset from the s_mask array
mask_sea = xr.DataArray(s_mask, name='sea', dims=('ncells',))
mask_sea
[58]:
<xarray.DataArray 'sea' (ncells: 20971520)>
dask.array<array, shape=(20971520,), dtype=float64, chunksize=(20971520,), chunktype=numpy.ndarray>
Dimensions without coordinates: ncells
[59]:
# submit the remap job and get immediately the results
mask_sea_lonlat = dask_client.submit(remap,
                                     mask_sea,
                                     grid_file,
                                     griddes_file,
                                     weightfile,
                                     scratch_dir,
                                     attrs=global_attrs).result()
mask_sea_lonlat
[59]:
<xarray.Dataset>
Dimensions:  (exp: 0, lat: 1800, lon: 3600)
Coordinates:
  * lon      (lon) float64 -179.9 -179.8 -179.8 -179.6 ... 179.8 179.9 180.0
  * lat      (lat) float64 -89.95 -89.85 -89.75 -89.65 ... 89.75 89.85 89.95
  * exp      (exp) float64
Data variables:
    sea      (lat, lon) float64 nan nan nan nan nan nan ... 1.0 1.0 1.0 1.0 1.0
Attributes:
    CDI:          Climate Data Interface version 1.9.8 (https://mpimet.mpg.de...
    Conventions:  CF-1.6
    history:      Sun Jun 14 00:51:01 2020: cdo -O remap,/scratch/m/m300765/g...
    source:       git@gitlab.dkrz.de:icon/icon-aes.git@6b5726d38970a46b3ff1ac...
    institution:  Max Planck Institute for Meteorology
    title:        ICON simulation
    references:   see MPIM/DWD publications
    comment:      Sapphire Dyamond (k203123) on m11338 (Linux 2.6.32-754.14.2...
    CDO:          Climate Data Operators version 1.9.8 (https://mpimet.mpg.de...
[60]:
zone_avg = (dset_remap*mask_sea_lonlat['sea'].data).sel({'lat':slice(-30, 30)}).mean(dim='lon')
[61]:
%matplotlib notebook
fig, axs = plt.subplots(1, 2,figsize=(9, 4.5), sharey=False)
_ = fldmean['net_surf_energy'].plot.line(x='time', ax=axs[0], add_legend=False)
axs[0].set_ylabel('Net Surface Energy [W/m$^2$]')
_ = zone_avg['net_surf_energy'].plot.line(x='lat', ax=axs[1])
axs[-1].set_ylabel('')
axs[-1].set_xlabel('Latitude [$^\circ$N]')
fig.subplots_adjust(left=0.1, right=0.99, wspace=0.2, top=.95, bottom=0.3)
sns.despine()

4.0 Working with 3D data

Working with 3D data can be a little more challenging because chances are high that the data wont fit into memory. So it is important to reduce the data as much as possible. But first lets open some 3D datasets and inspect them:

[62]:
# Define the file pattern for qc, qi, pressure and qv
glob_pattern_3d = ('atm_3d_1_ml',  'atm_3d_3_ml', 'atm_3d_4_ml', 'atm_3d_5_ml')
[63]:
datasets_3d = {exp: [] for exp in paths.keys()}
datasets_3d
# Open each file pattern for every experiment seperatly and merge the data into one dataset
for exp in datasets_3d.keys():
    for glob_pattern in glob_pattern_3d:
        datasets_3d[exp].append(
            xr.open_mfdataset(f'{paths[exp] / exp}*{glob_pattern}*.nc',
                              parallel=True,
                              combine='by_coords',
                             chunks={'time': 1, 'height': 5} # This is important
                             ).sel({'time': slice('2020-01-21T00:00:00', '2020-01-31T18:00:00')})
        )
    # Merge the datasets (each for one variable) and select the time where both experiments have data
    datasets_3d[exp] = xr.merge(datasets_3d[exp])

Note the chunks parameter. This tells xarray to try split up the data into chunks of 1 in time and 5 in height. When dealing with large datasets like here the chunk size is important because each data chunk will be distributed across the cluster memory. If the chunk size is to big (to little chunks) the worker nodes might run out of memory if on the other hand the there are to many chunks (to small chunk size) the cluster might die from communication overhead. In this case we have 77 levels so a chunk size of 5 could be a good choice. Read more about chunking on https://docs.dask.org/en/latest/array-chunks.html

[64]:
# Check the total size of the datasets
filesize(np.sum([dset.nbytes for dset in datasets_3d.values()]))
[64]:
'2T'

We have ‘only’ 2.15 TB of memory available so the will barely fit into memory. Since we’re just interested in tropical profiles we can reduce the number significantly averaging. The problem is during the data reduction process (averaging) much more memory will we consumed hence more memory. Let’s do the average operation one experiment and 4 days at a time. This will take a while but we wont choke the cluster.

[66]:
for exp, data in datasets_3d.items():
    # Split the data by day:
    days, daily_data = zip(*data.resample({'time': '4d'})) # Split data set into chunks of 4 days
    print(f'{exp}: Averaging chunk 1/{len(days)}', flush=True)
    dset = daily_data[0].mean(dim='time').persist() # Push the 1st day to the cluster, wait unitl finished
    progress(dset, notebook=False)
    print('\b')
    wait(dset)
    for nn, data in enumerate(daily_data[1:]):
        print(f'{exp}: Averaging chunk {nn+2}/{len(days)}', flush=True)
        tmp_data = data.mean(dim='time').persist()
        wait(tmp_data)
        dset += tmp_data
        del tmp_data
    datasets_3d[exp] = (dset / len(days)).persist()
    del dset # Delete unsused arrays (just in case)
    wait(datasets_3d[exp])
dpp0015: Averaging chunk 1/3
[########################################] | 100% Completed |  5min 36.9s
dpp0015: Averaging chunk 2/3
[########################################] | 100% Completed |  0.5s
dpp0015: Averaging chunk 3/3
[########################################] | 100% Completed |  0.3s
dpp0018: Averaging chunk 1/3
[########################################] | 100% Completed |  1min 58.4s
dpp0018: Averaging chunk 2/3
[########################################] | 100% Completed |  0.4s
dpp0018: Averaging chunk 3/3
[########################################] | 100% Completed |  0.3s

This operation will take some time, so grab another coffee or tea and hope the cluster won’t run out of memory.

Experiment dpp0016 has a file that describes the z coordinates lets load it for using it as the new height coordinate later:

[190]:
z_file = Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_vgrid_ml.nc')
z_data = xr.open_mfdataset([z_file], combine='by_coords', chunks={'height_2': 5, 'height': 5})['zg']
z_data.data = (z_data.data - z_data.isel({'height_2': -1}).data + 25)
z_data = xr.Dataset(data_vars={'zg': z_data}).rename({'height_2':'height'}).persist()
_ = wait(z_data) # Wait until loaded
[191]:
height = np.round(z_data['zg'].isel({'ncells': 0}).values.round(0)[30:] / 10) * 10
for exp in datasets_3d.keys():
    datasets_3d[exp]['zg'] = z_data['zg']
[176]:
datasets_3d
[176]:
<xarray.Dataset>
Dimensions:  (exp: 2, height: 90, ncells: 20971520)
Coordinates:
    clon     (ncells) float32 dask.array<chunksize=(20971520,), meta=np.ndarray>
    clat     (ncells) float32 dask.array<chunksize=(20971520,), meta=np.ndarray>
  * height   (height) float64 1.0 2.0 3.0 4.0 5.0 ... 86.0 87.0 88.0 89.0 90.0
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Data variables:
    pfull    (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    ta       (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    wap      (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    cl       (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    hus      (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    clw      (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
    cli      (exp, height, ncells) float32 dask.array<chunksize=(1, 5, 20971520), meta=np.ndarray>
[80]:
# Merge the dataset together
datasets_3d = xr.concat([dset for dset in datasets_3d.values()], dim='exp').assign_coords({'exp': dpp_runs})
datasets_3d = datasets_3d.drop_vars({'height_bnds'}) # Get rid of height_bnds
[194]:
shape = datasets_3d.coords['exp'].shape[0], len(height), datasets_3d.coords['ncells'].shape[0]
shape
[194]:
(2, 60, 20971520)

Now we will use the stratify library to interpolate the data to constant z height.

[141]:
import stratify
[199]:
new_data = {}
for varn in datasets_3d.data_vars:
    # Do the interpolation on the cluster with dask.delayed
    tmp = dask.delayed(stratify.interpolate)(height, z_data['zg'].data, datasets_3d[varn].data, axis=1)
    # Create an xarray data array from the delayed object
    new_data[varn] = xr.DataArray(dask.array.from_delayed(tmp, shape=shape, dtype=np.float32),
                                  name=varn,
                                  dims=('exp', 'Z', 'ncells'),
                                  coords={'exp':datasets_3d.coords['exp'],
                                          'Z': height}).persist()
progress(new_data, notebook=True)
[203]:
new_data = xr.Dataset(data_vars=new_data).persist()

Let’s create some cloud-water/ice and relative humidity profiles for tropical ocean and land here we can also apply the masks.

[205]:
data  = {}
data['Land'] = (new_data * mask_tr * l_mask).mean(dim='ncells').persist()
wait(data) # Wait until done, not to choke clusters memory
data['Ocean'] = (new_data * mask_tr * s_mask).mean(dim='ncells').persist()
wait(data)
data['Land&Ocean'] = (new_data * mask_tr).mean(dim='ncells').persist()
progress(data, notebook=True)
[206]:
# Let's lumb the datasets along a new dimension together
data = xr.concat([dset for dset in data.values()], dim='surf').assign_coords({'surf':list(data.keys())})
data['Z'].attrs = {'standard_name': 'Z', 'units': 'km'}
data = data.load() # Load the data into local memory

We want calculate a relative humidity profile. The metpy packages offers a lot of calculation routines. You can check what type of calculation is available on their website https://unidata.github.io/MetPy/latest/api/generated/metpy.calc.html . Metpy is handy as it also takes care about units:

[207]:
# Calculate relative humidity
rh = metcalc.relative_humidity_from_specific_humidity(
    data['hus'].data * metunits('kg/kg'),
    data['ta'].data * metunits('K'),
    data['pfull'].data * metunits('Pa')
).magnitude
[225]:
data['rh'] = xr.DataArray(rh*100.,
                          name='rh',
                          coords=data['cl'].coords,
                          dims=data['cl'].dims,
                          attrs={'standard_name': 'RH',
                                 'units': '%'})
data['Z'].data = data['Z'].data / 1000.
[228]:
%matplotlib notebook
plot = data['rh'].isel({'Z':slice(10, 60)}).plot.line(y='Z',
                                                           col='surf',
                                                           figsize=(8.5,5),
                                                           sharex=False)
[229]:
%matplotlib notebook
clx = data['clw'] + data['cli']
clx.attrs = {'standard_name': 'cli + clw' , 'units': 'kg/kg'}
plot = clx.isel({'Z':slice(10, 60)}).plot.line(y='Z', col='surf', figsize=(8.5,5), sharex=False)

Conclusion

This notebook presented some techniques to analyze dyamond datasets in python. We basically utilized xarray a very powerful data processing library to work with multi dimensional data. xarray builds up on dask a library that makes distributed data processing easy.

Specifically we created a cluster of 10 workers with 420 cpu cores and nearly 2.5 TB of memory. This cluster was used to process the dyamond data across the workers. This setup allowed us to use the native grid with its high resolutions as much as possible. Only when plotting maps we had to involve cdo for remapping.

Altogether we have actively have utilized more than 400 GB of memory with peak usages of nearly 1 TB this would have never been possible with a single computer.

There are several pitfalls when it comes to distributed data processing. First it is essential to understand that computations are collected as much as possible rather than triggered immediately. This has the advantage that task streams can be optimized - this technique is called map reduce. Computations should be triggered only at the last moment. Another crucial part is data chunking this is especially true for 3D data. Choosing the right chunk size can be challenging at times but is important in order to keep the clusters memory intact.

The reader might have noticed that reading the data and especially plotting can be convoluted. Hence the next notebook will demonstrate some functionalities that have been added to this repository to make data processing and plotting a little more easy.

[ ]: