Processing big model output data

This notebook serves as a stack of examples of how to make you of various python based data processing libraries to process large data from high resolution model output.

In this notebook we are going to process data from the Dyamond Winter project as an example. The data is available on mistral hence the notebook should be applied in a mistral computing environment to access the data.

Installing Libraries:

To be able to import all libraries that a necessary to run this notebook install the esm_analysis repository:

python3 -m pip install git+https://github.com/antarcticrainforest/esm_analysis.git

Import Libraries:

[1]:
from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
from subprocess import run, PIPE

from cartopy import crs as ccrs # Cartography library
import dask # Distributed data libary
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from distributed import Client, progress, wait # Libaray to orchestrate distributed resources
from hurry.filesize import size as filesize # Get human readable file sizes
from matplotlib import pyplot as plt # Standard Plotting library
from metpy import calc as metcalc # Calculate atmospheric variables
from metpy.units import units as metunits # Easy to use meteorological units
import numpy as np # Standard array library
import pandas as pd # Libary to work with labeled data frames and time series
import seaborn as sns # Makes plots more beautiful
import xarray as xr # Libary to work with labeled n-dimensional data and dask

2.0 Setup a distributed computing cluster where we can process the data:

The data we are going to process is in the order of TB. On a single machine using a single core this can not only be slow but also the fields we are trying to process won’t fit into a single computers memory. Therefore we’re setting up a distributed cluster using the dask_jobqueue library. Specifically we will involve the Slurm workload manager to to that. More information on the dask_jobqueue library can be found here: https://jobqueue.dask.org/en/latest/ .

To create the slum cluster we need some information, like the account that is going to be charged and the partition that is going to be used. In this example we are going to use the GPU partition but any other partition can be involved:

[2]:
# Set some user specific variables
account_name = 'mh0731' # Account that is going to be 'charged' fore the computation
partition = 'prepost' # Name of the partition we want to use
job_name = 'dyamondProc' # Job name that is submitted via sbatch
memory = "200GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 42 # Max number of cores per that are reserved - also partition dependend
walltime = '12:00:00' # Walltime - also partition dependen
[3]:
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_scratch_dir = TemporaryDirectory(dir=scratch_dir, prefix='DyamondProc')
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       project=account_name,
                       walltime=walltime,
                       queue=partition,
                       local_directory=dask_scratch_dir.name,
                       job_extra=[f'-J {job_name}',
                                  f'-D {dask_scratch_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

So far nothing has happened, lets order 10 nodes which will give us 420 cores and 2 TB distributed memory to work on:

[4]:
cluster.scale(10)
cluster

Now we have submitted two jobs that establish the distributed computing resources. After some queuing time, depending on how busy the computer is. The resources will be available. This can be seen when the above interfaces changes from

Workers 0/10

to

Workers 10

We can also check the status by calling the squeue command from bash:

[5]:
! squeue -u $USER
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          21778712   prepost dyamondP  m300765  R       2:29      1 m11543
          21778713   prepost dyamondP  m300765  R       2:29      1 m11555
          21778714   prepost dyamondP  m300765  R       2:29      1 m11556
          21778715   prepost dyamondP  m300765  R       2:29      1 m11557
          21778716   prepost dyamondP  m300765  R       2:29      1 m11558
          21778717   prepost dyamondP  m300765  R       2:29      1 m11559
          21778718   prepost dyamondP  m300765  R       2:29      1 m11513
          21778719   prepost dyamondP  m300765  R       2:29      1 m11514
          21778711   prepost dyamondP  m300765  R       2:57      1 m11549
          21778710   prepost dyamondP  m300765  R       2:59      1 m11548

Now that the computing resources are made available we have to connect a client to it. This client servers as an instance between the commands we are going to use and the cluster. Let’s create the client. This can be done by calling the Client instance with the cluster we have just created. This will tell dask the distributed library do to all calculations on the cluster.

[6]:
dask_client = Client(cluster)

If not workers are available yet not computation will be done. Once the workers become available computation can be executed on the available worker only. So in general it is a good idea to wait until as much as possible workers are available and yet more computing resources is can be utilized.

[7]:
# Blocking call to wait for at least 10 workers before continuing;
# This might take a while so grab a coffee or tea
dask_client.wait_for_workers(10)
dask_client
[7]:

Client

Cluster

  • Workers: 10
  • Cores: 420
  • Memory: 2.15 TB

3.0 Read 2D input data

Let’s define the paths and the variables we are going to read. In this example we want to compare two datasets that is:

  • dpp0015 : Coupled dyamond run with standard parameter configuration

  • dpp0017 : Coupled dyamond run with increased ocean albedo and increased inversion parameter Cₙ

In this example we will make use of six hourly data which is store in <exp>_2d_atm_ml_<timestep>.nc file patterns.

[8]:
# Define paths here
paths = {'dpp0015' : Path('/work/mh0287/k203123/GIT/icon-aes-dyw/experiments') / 'dpp0015',
         'dpp0018' : Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments') / 'dpp0018'}
glob_pattern_2d = 'atm_2d_ml'
dpp_runs = list(paths.keys())

The runs can be read with the open_mfdataset from xarray. This methods takes quite a number of arguments. We will try opening all the <exp>_2d_atm_ml_<timestep>.nc files and merging them into one big dataset. It should be noted thet open_mfdataset doesn’t read any data until told. What it does, is just fetching meta data and creating a view to the data.

[9]:
datasets = {}
for exp in dpp_runs:
    print(f'Reading data from {exp}', end='\r')
    datasets[exp] = xr.open_mfdataset(
        str(paths[exp] / f'{exp}*{glob_pattern_2d}*.nc'),
        combine='by_coords',
        parallel=True,
        chunks={'time': 1}) # The chunking will get important when we read 3d data.
print(f'Done {50*" "}')
Done
[10]:
# dpp0017 has only data until 2st of February so we take a subset of both datasets:
for exp in dpp_runs:
    datasets[exp] = datasets[exp].sel({'time': slice('2020-01-21T00:00:00', '2020-02-01T18:00:00')})
[11]:
datasets['dpp0015'].time
[11]:
<xarray.DataArray 'time' (time: 48)>
array(['2020-01-21T00:00:00.000000000', '2020-01-21T06:00:00.000000000',
       '2020-01-21T12:00:00.000000000', '2020-01-21T18:00:00.000000000',
       '2020-01-22T00:00:00.000000000', '2020-01-22T06:00:00.000000000',
       '2020-01-22T12:00:00.000000000', '2020-01-22T18:00:00.000000000',
       '2020-01-23T00:00:00.000000000', '2020-01-23T06:00:00.000000000',
       '2020-01-23T12:00:00.000000000', '2020-01-23T18:00:00.000000000',
       '2020-01-24T00:00:00.000000000', '2020-01-24T06:00:00.000000000',
       '2020-01-24T12:00:00.000000000', '2020-01-24T18:00:00.000000000',
       '2020-01-25T00:00:00.000000000', '2020-01-25T06:00:00.000000000',
       '2020-01-25T12:00:00.000000000', '2020-01-25T18:00:00.000000000',
       '2020-01-26T00:00:00.000000000', '2020-01-26T06:00:00.000000000',
       '2020-01-26T12:00:00.000000000', '2020-01-26T18:00:00.000000000',
       '2020-01-27T00:00:00.000000000', '2020-01-27T06:00:00.000000000',
       '2020-01-27T12:00:00.000000000', '2020-01-27T18:00:00.000000000',
       '2020-01-28T00:00:00.000000000', '2020-01-28T06:00:00.000000000',
       '2020-01-28T12:00:00.000000000', '2020-01-28T18:00:00.000000000',
       '2020-01-29T00:00:00.000000000', '2020-01-29T06:00:00.000000000',
       '2020-01-29T12:00:00.000000000', '2020-01-29T18:00:00.000000000',
       '2020-01-30T00:00:00.000000000', '2020-01-30T06:00:00.000000000',
       '2020-01-30T12:00:00.000000000', '2020-01-30T18:00:00.000000000',
       '2020-01-31T00:00:00.000000000', '2020-01-31T06:00:00.000000000',
       '2020-01-31T12:00:00.000000000', '2020-01-31T18:00:00.000000000',
       '2020-02-01T00:00:00.000000000', '2020-02-01T06:00:00.000000000',
       '2020-02-01T12:00:00.000000000', '2020-02-01T18:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
Attributes:
    standard_name:  time
    axis:           T
[12]:
datasets['dpp0018'].time # This data is daily data only
[12]:
<xarray.DataArray 'time' (time: 12)>
array(['2020-01-21T00:00:00.000000000', '2020-01-22T00:00:00.000000000',
       '2020-01-23T00:00:00.000000000', '2020-01-24T00:00:00.000000000',
       '2020-01-25T00:00:00.000000000', '2020-01-26T00:00:00.000000000',
       '2020-01-27T00:00:00.000000000', '2020-01-28T00:00:00.000000000',
       '2020-01-29T00:00:00.000000000', '2020-01-30T00:00:00.000000000',
       '2020-01-31T00:00:00.000000000', '2020-02-01T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
Attributes:
    standard_name:  time
    axis:           T
[13]:
# Merge both datasets in one dataet together to make it easier to work with them:
datasets = xr.concat(list(datasets.values()), dim='exp').assign_coords({'exp': list(datasets.keys())})
datasets
[13]:
<xarray.Dataset>
Dimensions:  (exp: 2, ncells: 20971520, time: 48)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Data variables:
    ps       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    psl      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdt     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsut     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsutcs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlut     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlutcs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsds     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlds     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rldscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsus     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsuscs   (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlus     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ts       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sic      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sit      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clt      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prlr     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prls     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    pr       (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prw      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    cllvi    (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clivi    (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qgvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qrvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qsvi     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfls     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfss     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    evspsbl  (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tauu     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tauv     (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    sfcwind  (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    uas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    vas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tas      (exp, time, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
Attributes:
    CDI:                  Climate Data Interface version 1.8.3rc (http://mpim...
    Conventions:          CF-1.6
    number_of_grid_used:  15
    grid_file_uri:        http://icon-downloads.mpimet.mpg.de/grids/public/mp...
    uuidOfHGrid:          0f1e7d66-637e-11e8-913b-51232bb4d8f9
    title:                ICON simulation
    institution:          Max Planck Institute for Meteorology/Deutscher Wett...
    source:               git@gitlab.dkrz.de:icon/icon-aes.git@6b5726d38970a4...
    history:              /work/mh0287/k203123/GIT/icon-aes-dyw/bin/icon at 2...
    references:           see MPIM/DWD publications
    comment:              Sapphire Dyamond (k203123) on m11338 (Linux 2.6.32-...
[14]:
global_attrs = datasets.attrs
var_attrs = {varn: datasets[varn].attrs for varn in datasets.data_vars}
[15]:
def set_attrs(dataset):
    '''Set attributes to a dataset that might have lost its attributes.'''
    dataset.attrs = global_attrs
    first_attr = list(var_attrs.values())[0]
    for varn in dataset.data_vars:
        try:
            dataset[varn].attrs = var_attrs[varn]
        except KeyError:
            dataset[varn].attrs = first_attr
    return dataset

At this stage no data - except for meta data - has been read. Since meta data is present we can inspect the dataset a little further:

[16]:
datasets.sel({'exp': 'dpp0015'})['ts']
[16]:
<xarray.DataArray 'ts' (time: 48, ncells: 20971520)>
dask.array<getitem, shape=(48, 20971520), dtype=float32, chunksize=(1, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 ... 2020-02-01T18:00:00
    exp      <U7 'dpp0015'
Dimensions without coordinates: ncells
Attributes:
    standard_name:                surface_temperature
    long_name:                    surface temperature
    units:                        K
    param:                        0.0.0
    CDI_grid_type:                unstructured
    number_of_grid_in_reference:  1
[17]:
datasets.sel({'exp': 'dpp0015'})['ts'].data
[17]:
Array Chunk
Bytes 4.03 GB 83.89 MB
Shape (48, 20971520) (1, 20971520)
Count 842 Tasks 48 Chunks
Type float32 numpy.ndarray
20971520 48

The data attribute returns either data array or if the data hasn’t been written yet a representation of what the will look like. It is also called a future, a very important concept for distributed computing. In our case the representation of the data is a dask array. Dask is a library that can split up the data into chunks and evenly spreads the data chunks across different cpu’s and computers. In our example we can see that the surface temperature dataset is split up into 44 chunks. Reading the array would take 935 tasks the total dataset would take 3.69 GB of memory. We can also ask xarray how much memory the whole dataset would consume:

[18]:
filesize(datasets.nbytes) # Use the filesize module to make the output more readable
[18]:
'270G'

This means that the total dataset (both experiments) would need 270 GB of memory. Way to much for a local computer but we do have 1.5 TB of distributed memory. Although this data would comfortably fit there we can reduce its size a little further by throwing out variables we don’t need and creating daily averages (the data is 6 hourly).

[19]:
# Reduce the data a little further, get only interesting variables
data_vars = ['clivi', 'cllvi', 'clt', 'hfls', 'hfss', 'pr', 'prw', 'ps', 'qgvi', 'qrvi',
             'qsvi', 'rlds', 'rlus', 'rlut', 'rsds', 'rsdscs', 'rsdt', 'rsus', 'rsuscs',
             'rsut', 'rsutcs', 'tas', 'ts', 'uas', 'vas']
datasets = datasets[data_vars]
# Create daily average
datasets = datasets.resample({'time': '1D'}).mean()
# We lost attributes set the saved once now
datasets.attrs = global_attrs
for varn in datasets.data_vars:
    datasets[varn].attrs = var_attrs[varn]
datasets
[19]:
<xarray.Dataset>
Dimensions:  (exp: 2, ncells: 20971520, time: 12)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Data variables:
    clivi    (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    cllvi    (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    clt      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfls     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    hfss     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    pr       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    prw      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ps       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qgvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qrvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    qsvi     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlds     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlus     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rlut     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsds     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdscs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsdt     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsus     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsuscs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsut     (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    rsutcs   (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    tas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    ts       (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    uas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
    vas      (time, exp, ncells) float32 dask.array<chunksize=(1, 1, 20971520), meta=np.ndarray>
Attributes:
    CDI:                  Climate Data Interface version 1.8.3rc (http://mpim...
    Conventions:          CF-1.6
    number_of_grid_used:  15
    grid_file_uri:        http://icon-downloads.mpimet.mpg.de/grids/public/mp...
    uuidOfHGrid:          0f1e7d66-637e-11e8-913b-51232bb4d8f9
    title:                ICON simulation
    institution:          Max Planck Institute for Meteorology/Deutscher Wett...
    source:               git@gitlab.dkrz.de:icon/icon-aes.git@6b5726d38970a4...
    history:              /work/mh0287/k203123/GIT/icon-aes-dyw/bin/icon at 2...
    references:           see MPIM/DWD publications
    comment:              Sapphire Dyamond (k203123) on m11338 (Linux 2.6.32-...
[20]:
datasets['tas']
[20]:
<xarray.DataArray 'tas' (time: 12, exp: 2, ncells: 20971520)>
dask.array<concatenate, shape=(12, 2, 20971520), dtype=float32, chunksize=(1, 1, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-21 2020-01-22 ... 2020-02-01
  * exp      (exp) <U7 'dpp0015' 'dpp0018'
Dimensions without coordinates: ncells
Attributes:
    standard_name:                tas
    long_name:                    temperature in 2m
    units:                        K
    param:                        0.0.0
    CDI_grid_type:                unstructured
    number_of_grid_in_reference:  1
[21]:
filesize(datasets.nbytes)
[21]:
'46G'

This means we have significantly reduced the size that will by held in memory. But beware, this is the size that will be occupied in memory once the data has been read. The reading, chunking and averaging process will take much more memory. Although dask tries to optimize the memory consumption as much as possible.

So far no data has been read. We can trigger reading the data by using the persist method. Persist will start pushing the data to the distributed memory. There the netcdf-files will be read the the data will be copied into memory and the averaging will be done. If no distributed memory is available persist uses the local memory. To use the local memory in the first place one can use the compute method. Please also refer to https://distributed.readthedocs.io/en/latest/manage-computation.html#dask-collections-to-futures for more information.

But before we trigger computations on the cluster, we can create some additional datasets. That is net top of the atmosphere radiation budget and net surface energy:

[22]:
datasets['net_sw_toa'] = datasets['rsdt'] - datasets['rsut']
datasets['net_lw_toa'] = -datasets['rlut']
datasets['net_sw_surf'] = datasets['rsds'] - datasets['rsus']
datasets['net_lw_surf'] = datasets['rlds'] - datasets['rlus']
datasets['net_toa'] = datasets['net_sw_toa'] + datasets['net_lw_toa']
datasets['net_surf'] = datasets['net_sw_surf'] + datasets['net_lw_surf']
datasets['net_surf_energy'] = datasets['net_surf'] + datasets['hfss'] + datasets['hfls']
[23]:
# Push the data to the cluster and trigger all computations there
datasets = datasets.persist()

This will trigger the computation in the background, we don’t have to wait but can keep on analyzing the data. The progress class from dask provides the opportunity to get a progress bar that runs in the background to inform the user about the status.

[24]:
# Let's inspect the progress
progress(datasets, notebook=True)

We can see the tasks that are worked on in the background on the cluster. We can continue working with the data. The paradigm is that all calculations are collected and not executed until we explicitly instruct xarray / dask to trigger computations.

3.1 Sub setting the data

Suppose we wanted to work on say tropical areas only we would have to find a way of accessing the right grid triangles and omitting those once outside the tropics. Sub setting the data would be easy if the would be on a lat-lon grid. Then could simply do something like this:

data_tropics = datasets.sel({'lat': slice(-30, 30)})

Since we are working with unstructured grids things are a little more complex but can be done. Essentially we have two options:

  • Creating an index and getting the data via the index.

  • Masking the data that we don’t need.

Both options involve more information on the grid. Let’s read a land-see mask file:

[25]:
grid_path = Path('/work/mh0731/m300765/graupel_tuning')
lsm_file = grid_path / 'r02b09_land_frac.nc'
lsm = xr.open_dataset(lsm_file, chunks={'cell': -1}).load() # Load the file into local memory first
lons, lats = np.rad2deg(lsm.clon), np.rad2deg(lsm.clat) #Read lon, lat vector

The .load method loads the data into local memory. We can see the difference of the data attribute once the data is in the local memory instead of distributed memory or not loaded at all yet:

[26]:
lsm['sea'].data
[26]:
array([-0., -0., -0., ...,  1.,  1.,  1.])
[27]:
# Create sea and land masks and push the data to the distributed memory
s_mask = np.full(lsm['sea'].shape, np.nan) # Creat nan arrays
l_mask = np.full(lsm['sea'].shape, np.nan)
l_mask[lsm['land'].data == 1] = 1
s_mask[lsm['sea'].data == 1] = 1
s_mask
[27]:
array([nan, nan, nan, ...,  1.,  1.,  1.])
[28]:
# Create a dask array from the sea/land_mask and push it to the cluster
s_mask = dask.persist(dask.array.from_array(s_mask, chunks=-1))[0] # Push the data to the cluster
l_mask = dask.persist(dask.array.from_array(l_mask, chunks=-1))[0]
[29]:
s_mask
[29]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1
[30]:
l_mask
[30]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1

The most efficient way for unstructured grids is applying masks.The idea here is to mask everything outside the tropics. Let’s try indexing the data for tropical values and creating an index using np.where

[31]:
mask_tr = np.full(lats.shape, np.nan) # Create a mask and set the indices where lat is in [-30, 30] = 1
idx = np.where((lats >= -30) & (lats <= 30))[0]
mask_tr[idx] = 1
mask_tr = dask.persist(dask.array.from_array(mask_tr, chunks=-1))[0]
[32]:
mask_tr
[32]:
Array Chunk
Bytes 167.77 MB 167.77 MB
Shape (20971520,) (20971520,)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20971520 1
[33]:
datasets = (datasets * mask_tr).persist()
[34]:
progress(datasets, notebook=True)

3.2 Plotting some maps

Lets to some plotting, first we plot maps, hence we create an average along the time axis:

[35]:
timmean = set_attrs(datasets.mean(dim='time').persist()) # Create mean and trigger computation
progress(timmean, notebook=True)

This computation is quite fast because we have already loaded the data into distributed memory. Alltogether this only took a little more than 1 minute. But the problem is that we cannot simply plot the data. To do so we are going to remap the data with cdo. For the remapping we need a grid file and a target grid description:

[36]:
# Define the grid describtion
griddes = '''#
# gridID 1
#
gridtype  = lonlat
gridsize  = 6480000
xsize     = 3600
ysize     = 1800
xname     = lon
xlongname = "longitude"
xunits    = "degrees_east"
yname     = lat
ylongname = "latitude"
yunits    = "degrees_north"
xfirst    = -179.95
xinc      = 0.1
yfirst    = -89.95
yinc      = 0.1
'''
# Write the grid description to a temporary file
griddes_file = NamedTemporaryFile(dir=scratch_dir, prefix='griddes_', suffix='.txt').name
with Path(griddes_file).open('w') as f: f.write(griddes)
[37]:
#Define the path to the grid-file
grid_file = grid_path / 'icon_grid_0015_R02B09_G.nc'

The best way to remap unstructured data on the resolution is a weighted remap. We do not have a weight file let’s create one first. For this we define a function that will call the cdo gendis command. To execute the function on the cluster we use the dask.delayed decorator, to tell the code it should be executed remotely.

[38]:
@dask.delayed
def gen_dis(dataset, grid_file, griddes, scratch_dir, global_attrs={}):
    '''Create a distance weights using cdo.'''
    if isinstance(dataset, xr.DataArray):
        # If a dataArray is given create a dataset
        dataset = xr.Dataset(data_vars={dataset.name: dataset})
        dataset.attrs = global_attrs
    enc, dataset = get_enc(dataset)
    weightfile = NamedTemporaryFile(dir=scratch_dir, prefix='weight_file', suffix='.nc').name
    with NamedTemporaryFile(dir=scratch_dir, prefix='input_', suffix='.nc') as tf:
        dataset.to_netcdf(tf.name,  encoding=enc, mode='w')
        cmd = ('cdo', '-O', f'gendis,{griddes}', f'-setgrid,{grid_file}', tf.name, weightfile)
        run_cmd(cmd)
        return weightfile

def run_cmd(cmd):
    '''Run a bash command.'''
    status = run(cmd, check=False, stderr=PIPE, stdout=PIPE)
    if status.returncode != 0:
        error = f'''{' '.join(cmd)}: {status.stderr.decode('utf-8')}'''
        raise RuntimeError(f'{error}')
    return status.stdout.decode('utf-8')

def get_enc(dataset, missing_val=-99.99e36):
    """Get encoding to save datasets."""
    enc = {}
    for varn in dataset.data_vars:
        enc[varn] = {'_FillValue': missing_val}
        dataset[varn].attrs = {**dataset[varn].attrs, **{'missing_value': missing_val}}
    return enc, dataset
[39]:
weightfile = gen_dis(timmean['tas'].isel({'exp':0}),
                     grid_file,
                     griddes_file,
                     scratch_dir,
                     global_attrs).compute()

We will remap the data on the the distributed cluster. For this we have to define a function that will remap the data is is executed on the cluster:

[40]:
def remap(dataset, grid_file, griddes, weightfile, tmpdir, attrs={}):
    """Perform a weighted remapping.

    Parameters
    ==========

    dataset : xarray.dataset
        The dataset the will be regriddes
    grid_file : Path, str
        Path to the grid file
    griddes : Path, str
        Path to the grid describtion file
    tmpdir : Path, str
        Directory for where temporary fils are stored

    Returns
    =======
    xarray.dataset : Remapped dataset
    """
    if isinstance(dataset, xr.DataArray):
        # If a dataArray is given create a dataset
        dataset = xr.Dataset(data_vars={dataset.name: dataset})
        dataset.attrs = attrs
    enc, dataset = get_enc(dataset)
    try:
        # cdo doesn't like strings as data; temporarly assign the exp coordinates to numbers (if exp present)
        exps = dataset.coords['exp'].values
        dataset = dataset.assign_coords({'exp': np.arange(len(exps))})
    except KeyError:
        exps = []
    with TemporaryDirectory(dir=tmpdir, prefix='cdo_') as indir:
            infile = Path(indir) / 'infile.nc'
            outfile = Path(indir) / 'outfile.nc'
            dataset.to_netcdf(infile, encoding=enc, mode='w')
            # Create the command to run
            cmd = ('cdo', '-O', f'remap,{griddes},{weightfile}',
                   f'-setgrid,{grid_file}',
                   str(infile), str(outfile))
            # Get the return value of the command
            run_cmd(cmd)
            try:
                # If there was a exp dimension put its values back into place
                return xr.open_dataset(outfile).load().assign_coords({'exp': exps})
            except KeyError:
                return xr.open_dataset(outfile).load()
[41]:
# Submit the remap function call to the cluster, lets do every variable in parallel
remap_futures = []
for var_name in datasets.data_vars:
    remap_futures.append(dask_client.submit(remap,
                                            timmean[var_name],
                                            grid_file,
                                            griddes_file,
                                            weightfile,
                                            scratch_dir,
                                            attrs=timmean.attrs))
progress(remap_futures, notebook=True)
[42]:
# Merge the results from the parllel computing back together into one dataset
dset_remap = xr.merge(dask_client.gather(remap_futures))
dset_remap
[42]:
<xarray.Dataset>
Dimensions:          (exp: 2, lat: 1800, lon: 3600)
Coordinates:
  * lon              (lon) float64 -179.9 -179.8 -179.8 ... 179.8 179.9 180.0
  * lat              (lat) float64 -89.95 -89.85 -89.75 ... 89.75 89.85 89.95
  * exp              (exp) <U7 'dpp0015' 'dpp0018'
Data variables:
    clivi            (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    cllvi            (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    clt              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    hfls             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    hfss             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    pr               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    prw              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    ps               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qgvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qrvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    qsvi             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlds             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlus             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rlut             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsds             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsdscs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsdt             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsus             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsuscs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsut             (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    rsutcs           (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    tas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    ts               (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    uas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    vas              (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_sw_toa       (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_lw_toa       (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_sw_surf      (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_lw_surf      (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_toa          (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_surf         (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan
    net_surf_energy  (exp, lat, lon) float64 nan nan nan nan ... nan nan nan nan

xarray has a powerful plot functionality that can be used to quickly plot and inspect the data:

[43]:
%matplotlib notebook
# Use the notebook plotting backend from matplotlib
(-dset_remap['rlut']).sel({'exp': 'dpp0015'}).plot.imshow(cmap='RdYlBu_r')
[43]:
<matplotlib.image.AxesImage at 0x2b5afb8510d0>

These plots are not very nice but we can make them much nicer by using cartopy to draw maps. Xarray supports subplots and fiddeling with the colorbar. Let’s create a nicer plot of net surface energy.

[44]:
plot_data = dset_remap['net_surf_energy'].sel({'lat': slice(-30, 30)})
# Create a diff (exp1 - exp2)
plot_exps = list(plot_data.coords['exp'].values)
diff_data = plot_data.diff(dim='exp').assign_coords({'exp': [f'{plot_exps[1]} - {plot_exps[0]}']})

#Let's rename the dimension in diff_data
diff_data = diff_data.rename({'exp': 'diff'})
[45]:
proj = ccrs.PlateCarree(central_longitude=50.) # Create ylindrical projections
plot = plot_data.plot(transform=ccrs.PlateCarree(),
                      row='exp',
                      figsize=(9,5),
                      cmap='RdYlBu_r',
                      vmin=-170,
                      vmax=170,
                      subplot_kws={'projection': proj},
                      cbar_kwargs={'label': 'Net Surface Energy [W/m$^2$]',
                                   'extend': 'both',
                                   'anchor': (0.5, -0.15),
                                   'fraction': 0.8,
                                   'aspect': 60,
                                   'orientation': 'horizontal'},

                     )
_ = [ax.coastlines() for ax in plot.axes.flat]
plot.fig.subplots_adjust(left=0.01, right=0.99, hspace=0, wspace=0, top=1, bottom=0.2)
plot