Source code for esm_analysis.DistUtils

"""Collection of classes for creating batch clusters."""

from datetime import datetime
import json
from pathlib import Path
from subprocess import run, PIPE
from tempfile import TemporaryDirectory

__all__ = ('MPICluster',)

slurm_directive = """#!/bin/bash
# =====================================
# mistral batch job parameters
#SBATCH --account={account}
#SBATCH --job-name={name}
#SBATCH --partition={queue}
#SBATCH -D {workdir}
#SBATCH --output={workdir}/LOG_mpicluster.%j.o
#SBATCH --error={workdir}/LOG_mpicluster.%j.e
#SBATCH --exclusive
#SBATCH --time={walltime}
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --mem={memory}
#SBATCH -n {nworkers}
_script = """
rm -fr worker-*
rm -rf *.lock
rm -f $(ls LOG*.[oe]|grep -v $SLURM_JOB_ID)

ulimit -c 0


# Settings for OpenMPI and MXM (MellanoX Messaging)
# library
export OMPI_MCA_pml=cm
export OMPI_MCA_mtl=mxm
export OMPI_MCA_mtl_mxm_np=0
export MXM_RDMA_PORTS=mlx5_0:1
# Disable GHC algorithm for collective communication
export OMPI_MCA_coll=^ghc

{run_cmd} dask-mpi --no-nanny --{scheduler} --scheduler-file {scheduler_file}


class _Slurm:
    """Definitions to work with the slurm workload manager."""

    def _type(self):
        return 'slurm'

    def submit_cmd(self):
        """Slurm submit command."""
        return 'sbatch'

    def check_cmd(self):
        """Slurm check command."""
        return 'squeue'

    def cancel_cmd(self):
        """Slurm cancel comand."""
        return 'scancel'

    def run_cmd(self):
        """Slurm run comman."""
        return ('srun -l --cpu_bind=threads '
                '--distribution=block:cyclic --propagate=STACK')

    def cancel(self, job_id):
        """Close down a cluster with a given job_id."""
        if job_id is None:
        run([self.cancel_cmd, job_id], stdout=PIPE, check=True, shell=False)

    def check(self, job_id):
        """Check the status of a running cluster."""
        if job_id is None:
            return None, None, None
        res = run([self.check_cmd, '-j {}'.format(job_id)], check=True,
                  shell=False, stdout=PIPE).stdout.decode('utf-8').split('\n')
        if len(res) < 2:
            return None, None, None
        status = [line.split() for line in res]
        table = dict(zip(status[0], status[1][:len(status[0])]))

        status_l = dict(PD='Queueing', R='Running', F='Failed')
        return status_l[table['ST']], table['TIME'], table['NODES']

[docs]class MPICluster: """Create Cluster of distrbuted workers.""" def close(self): """Close down the running cluster.""" self._batch_system.cancel(self.job_id) self.job_id = None self._write_json() @property def status(self): """Check the status of the running cluster.""" status, _, _ = self._batch_system.check(self.job_id) try: return status[0].upper() except TypeError: return None def __repr__(self): """Print the status of the submitted jobs.""" status, time, nodes = self._batch_system.check(self.job_id) if status is None: return 'No cluster running' return '{}: time: {} nodes: {}'.format(status, time, nodes) def _repr_html_(self): """Print the status of the submitted jobs in html format.""" status, time, nodes = self._batch_system.check(self.job_id) colors = dict(Queueing='DodgerBlue', Fail='Tomato', Running='MediumSeaGreen') if status is None: return '<p>No cluster running<p>' color = colors[status] return """<p> <span style="color:{color};">{status}</span>: time: {time} nodes: {nodes}</p>""".format(color=color, status=status, time=time, nodes=nodes) @property def scheduler_file(self): """Return the schedule file.""" return Path(self.workdir) / 'scheduler.json' @property def script_path(self): """Return the path of the script that is/was submitted.""" return Path(self.workdir) / '' def _write_script(self): with open(str(self.script_path), 'w') as f: f.write(self.job_script) self.script_path.chmod(0o755) def _write_json(self): _json_data = dict(job_id=self.job_id, workdir=str(self.workdir), job_script=self.job_script, batch_system=self._batch_system._type, datetime=self.submit_time.isoformat()) with (self.workdir / 'cluster.json').open('w') as f: json.dump(_json_data, f, indent=3, sort_keys=True) @staticmethod def _load(workdir): try: with (workdir / 'cluster.json').open('r') as f: json_data = json.load(f) except FileNotFoundError: raise ValueError('Cluster has not been created.') json_data['datetime'] = datetime.strptime(json_data['datetime'], '%Y-%m-%dT%H:%M:%S.%f') json_data['workdir'] = Path(json_data['workdir']) return json_data
[docs] @classmethod def load(cls, workdir): """ Load the information of a running cluster. This method can be used to connect to an already running cluster. :: from esm_analysis import MPICluster cluster = MPICluster.load('/tmp/old_cluster') Parameters ---------- workdir : str Directory name where information of the previously created cluster is stored. The information on the work directory can be retrieved by calling the workdir property Returns ------- Instance of the MPICluster object: esm_analysis.MPICluster """ workdir = Path(workdir) _json_data = cls._load(workdir) lookup = dict(slurm=_Slurm) batch_system = lookup[_json_data['batch_system']]() script = _json_data['job_script'] job_id = _json_data['job_id'] if job_id is None: raise ValueError('Cluster was closed, submit a new one') submit_time = _json_data['datetime'] return cls(script, workdir, submit_time=submit_time, job_id=job_id, batch_system=batch_system)
def _submit(self): res = run([self._batch_system.submit_cmd, str(self.script_path)], cwd=str(self.workdir), stdout=PIPE, check=True, shell=False) job_id, _, _cluster = res.stdout.decode('utf-8').strip().partition(';') return job_id.split(" ")[-1] def __init__(self, script, workdir, submit_time=None, batch_system=None, job_id=None): """Create a cluster using a given submit script.""" self.job_script = script self.submit_time = submit_time self.job_id = job_id self._batch_system = batch_system self.workdir = Path(workdir) self.workdir.mkdir(parents=True, exist_ok=True) if self.submit_time is None: self._write_script() self.job_id = self._submit() self.submit_time = self._write_json()
[docs] @classmethod def slurm(cls, account, queue, *, slurm_extra=[''], memory='140G', workdir=None, walltime='01:00:00', cpus_per_task=48, name='dask_job', nworkers=1, job_extra=None): """ Create an MPI cluster using slurm. This method sets up a cluster with help of the workload manager slurm. :: from esm_analysis import MPICluster cluster = MPICluster.slurm('account', 'express', nworkers=10) The jobs will immediately be submitted to the workload manager upon creation of the instance. Parameters ---------- account: str Account name queue: str partition job should be submitted to walltime: str, optional (default: '01:00:00') lenth of the job name: str, optional (default: dask_job) name of the job workdir: str, optional (default: None) name of the workdirectory, if None is given, a temporary directory is used. cpus_per_task: int, optional (default: 48) number of cpus per node memory: str, optional (default: 140G) allocated memory per node nworkers: int, optional (default: 1) number of nodes used in the job job_extra: str, optional (default: None) additional commands that should be executed in the run sript slurm_extra: list, optional (default: None) additional slurm directives Returns ------- Instance of the MPICluster object: esm_analysis.MPICluster """ job_extra = job_extra or '' workdir = workdir or TemporaryDirectory().name workdir = Path(workdir) batch_system = _Slurm() slurm_extra = ['#SBATCH {}'.format(extr) for extr in slurm_extra if slurm_extra] scheduler_file = workdir / 'scheduler.json' script = slurm_directive.format( account=account, workdir=workdir, name=name, cpus_per_task=cpus_per_task, nworkers=nworkers+1, walltime=walltime, memory=memory, extra='\n'.join(slurm_extra), queue=queue) +\ _script.format(run_cmd=batch_system.run_cmd, job_extra=job_extra, scheduler='scheduler', scheduler_file=scheduler_file, rm_scheduler='rm -f {}'.format(scheduler_file) ) return cls(script, workdir, batch_system=batch_system)