bcftbx.JobRunner

Classes for starting, stopping and managing jobs.

Class BaseJobRunner is a template with methods that need to be implemented by subclasses. The subclasses implemented here are:

  • SimpleJobRunner: run jobs (e.g. scripts) on a local file system.

  • GEJobRunner : run jobs using Grid Engine (GE) i.e. qsub, qdel etc

  • SlurmRunner : run jobs using Slurm i.e. sbatch, scancel etc

A single job runner instance can be used to start and manage multiple processes.

Each job is started by invoking the run method of the runner. This returns an id string which is then used in calls to the various job monitoring and control methods (e.g. isRunning, terminate etc) to interact with the job.

The runner’s list method returns a list of running job ids.

Simple usage example:

>>> # Create a JobRunner instance
>>> runner = SimpleJobRunner()
>>> # Start a job using the runner and collect its id
>>> job_id = runner.run('Example',None,'myscript.sh')
>>> # Wait for job to complete
>>> import time
>>> while runner.isRunning(job_id):
>>>     time.sleep(10)
>>> # Get the names of the output files
>>> log,err = (runner.logFile(job_id),runner.errFile(job_id))

Processes run using a job runner inherit the environment where the runner is created and executed.

Additionally runners set an BCFTBX_RUNNER_NSLOTS environment variable, which is set to the number of slots (aka CPUs/cores/threads) available to processes executed by the runner. For all runners this defaults to one (i.e. serial jobs); the nslots option can be used when instantiating SimpleJobRunner and ‘SlurmRunner’ objects to specify more cores, for example:

>>> multicore_runner = SimpleJobRunner(nslots=4)

For GEJobRunner instances the number of cores is set by specifying the -pe argument as part of the ‘ge_extra_args’ option, for example:

>>> multicore_runner = GEJobRunner(extra_ge_args=('-pe','smp.pe','4'))
class bcftbx.JobRunner.BaseJobRunner

Base class for implementing job runners

This class can be used as a template for implementing custom job runners. The idea is that the runners wrap the specifics of interacting with an underlying job control system and thus provide a generic interface to be used by higher level classes.

A job runner needs to implement the following methods:

  • run : starts a job running

  • terminate : kills a running job

  • list : lists the running job ids

  • logFile : returns the name of the log file for a job

  • errFile : returns the name of the error file for a job

  • exit_status: returns the exit status for the command (or None if the job is still running)

Optionally it can also implement the methods:

  • errorState: indicates if running job is in an “error state”

  • isRunning : checks if a specific job is running

if the default implementations are not sufficient.

errFile(job_id)

Return name of error file relative to working directory

errorState(job_id)

Check if the job is in an error state

Return True if the job is deemed to be in an ‘error state’, False otherwise.

exit_status(job_id)

Return the exit status code for the command

Return the exit status code from the command that was run by the specified job, or None if the job hasn’t exited yet.

isRunning(job_id)

Check if a job is running

Returns True if job is still running, False if not

list()

Return a list of running job_ids

logFile(job_id)

Return name of log file relative to working directory

property log_dir

Return the current log directory setting

run(name, working_dir, script, args)

Start a job running

Parameters:
  • name – Name to give the job

  • working_dir – Directory to run the job in

  • script – Script file to run

  • args – List of arguments to supply to the script

Returns:

Returns a job id, or None if the job failed to start

set_log_dir(log_dir)

(Re)set the directory to write log files to

terminate(job_id)

Terminate a job

Returns True if termination was successful, False otherwise

class bcftbx.JobRunner.GEJobRunner(queue=None, log_dir=None, ge_extra_args=None, poll_interval=5, timeout=30)

Class implementing job runner for Grid Engine

GEJobRunner submits jobs to a Grid Engine cluster using the qsub command, determines the status of jobs using qstat and terminates them using qdel.

Additionally the runner can be configured for a specific GE queue on initialisation.

Each GEJobRunner instance creates a temporary directory which it uses for internal admin; this will be removed at program exit via ‘atexit’.

Parameters:
  • queue (str) – name of GE queue to use (set to ‘None’ to use default queue)

  • log_dir (str) – directory to write log files to (set to ‘None’ to use CWD)

  • ge_extra_args (list) – arbitrary additional arguments to supply to qsub

  • poll_interval (int) – time interval (in seconds) to use when polling Grid Engine e.g. to acquire qacct information (default: 5)

  • timeout (int) – maximum length of time (in seconds) to wait before giving up when polling Grid Engine (default: 30)

errFile(job_id)

Return the error file name for a job

The name should be ‘<name>.e<job_id>’

errorState(job_id)

Check if the job is in an error state

Return True if the job is deemed to be in an ‘error state’ (i.e. qstat returns the state as ‘E..’), False otherwise.

exit_status(job_id)

Return exit status from command run by a job

If the job is still running then returns ‘None’.

property ge_extra_args

Return the extra GE arguments

list()

Get list of job ids which are queued or running

logFile(job_id)

Return the log file name for a job

The name should be ‘<name>.o<job_id>’

name(job_id)

Return the name for a job

property nslots

Return the number of associated slots

This is extracted from the ‘ge_extra_args’ property, by looking for qsub options of the form ‘-pe XXXX N’ (e.g. ‘-pe smp.pe 8’), where ‘nslots’ will be N.

queue(job_id)

Fetch the job queue name

Returns the queue as reported by qstat, or None if not found.

run(name, working_dir, script, args)

Submit a script or command to the cluster via ‘qsub’

Parameters:
  • name – Name to give the job

  • working_dir – Directory to run the job in

  • script – Script file to run

  • args – List of arguments to supply to the script

Returns:

Job id for submitted job, or ‘None’ if job failed to start.

terminate(job_id)

Remove a job from the GE queue using ‘qdel’

class bcftbx.JobRunner.ResourceLock

Class for managing in-process locks on ‘resources’

A ‘resource’ is identified by an arbitrary string.

Example usage: create a new ResourceLock instance and check if a resource is locked:

>>> r = ResourceLock()
>>> r.is_locked("resource1")
False

Try to acquire the lock on the resource:

>>> lock = r.acquire("resource1")
>>> r.is_locked("resource1")
True

Release the lock on the resource:

>>> r.release(lock)
>>> r.is_locked("resource1")
False
acquire(resource_name, timeout=None)

Attempt to acquire the lock on a resource

Parameters:
  • resource_name (str) – name of the resource to acquire the lock name for

  • timeout (float) – optional, specifies a timeout period after which failure to acquire the lock raises an exception.

Returns:

lock name.

Return type:

String

is_locked(resource_name)

Check if a resource is locked

Parameters:

resource_name (str) – name of the resource to check the lock for

Returns:

True if resource is locked, False

if not.

Return type:

Boolean

release(lock)

Release a lock on a resource

Parameters:

lock (str) – lock to release.

class bcftbx.JobRunner.SimpleJobRunner(log_dir=None, join_logs=False, nslots=1)

Class implementing job runner for local system

SimpleJobRunner starts jobs as processes on a local system; the status of jobs is determined using the Linux ps eu command, and jobs are terminated using kill -9.

Parameters:
  • log_dir – Directory to write log files to (set to ‘None’ to use CWD)

  • join_logs – Combine stderr and stdout into a single log file (by default stdout and stderr have their own log files)

  • nslots – Number of threads associated with this runner instance

errFile(job_id)

Return the error file name for a job

exit_status(job_id)

Return exit status from command run by a job

list()

Return a list of running job_ids

logFile(job_id)

Return the log file name for a job

name(job_id)

Return the name for a job

property nslots

Return the number of associated slots

run(name, working_dir, script, args)

Run a command and return the PID (=job id)

Parameters:
  • name – Name to give the job

  • working_dir – Directory to run the job in

  • script – Script file to run

  • args – List of arguments to supply to the script

Returns:

Job id for submitted job, or ‘None’ if job failed to start.

terminate(job_id)

Kill a running job using ‘kill -9’

class bcftbx.JobRunner.SlurmRunner(log_dir=None, nslots=None, partition=None, join_logs=None, slurm_extra_args=None, poll_interval=300, timeout=30, missing_job_timeout=600)

Class implementing job runner for Slurm

SlurmRunner submits jobs to a Slurm cluster using the sbatch command, determines the status of jobs using squeue, and and terminates them using scancel.

Additionally the runner can be configured to target a specific partition and number of cores on initialisation.

Each SlurmRunner instance creates a temporary directory which it uses for internal admin; this will be removed at program exit via ‘atexit’.

Parameters:
  • log_dir (str) – path of directory to write log files to (set to ‘None’ to use cwd)

  • nslots (int) – number of threads assigned to the runner instance

  • partition (str) – name of Slurm partition to target (set to ‘None’ to use default queue)

  • join_logs (bool) – if True then combine stderr and stdout into a single log file (default is to write stdout and stderr to separate log files)

  • slurm_extra_args (list) – arbitrary additional arguments to supply to ‘sbatch’ (e.g. ‘[“-n”, 8]’)

  • poll_interval (int) – time interval to use (in seconds) when polling Slurm using ‘squeue’ (seconds) (default: 300.0)

  • timeout (int) – maximum length of time to wait (in seconds) before giving up when submitting jobs to Slurm and finalizing jobs (default: 30)

  • missing_job_timeout (int) – minimum time (in seconds) that a job needs to be flagged as “missing” before it’s removed from the runner (default: 600)

errFile(job_id)

Return the error file name for a job

The name should be ‘<name>.e<job_id>’

errorState(job_id)

Check if the job is in an error state

Return True if the job is deemed to be in an ‘error state’, False otherwise.

exit_status(job_id)

Return exit status from command run by a job

If the job is still running then returns ‘None’.

property join_logs

Return flag for whether to combine stdout and stderr logs

If not set then defaults to ‘False’.

list()

Get list of job ids which are queued or running

logFile(job_id)

Return the log file name for a job

The name should be ‘<name>.o<job_id>’

name(job_id)

Return the name for a specific job

Parameters:

job_id (int) – Job ID to get the name for

property nslots

Return the number of associated slots

If not set then defaults to 1.

property partition

Return the assigned partition

If not set then defaults to 1.

run(name, working_dir, script, args)

Submit a script or command to the cluster via ‘sbatch’

Parameters:
  • name (str) – name to give the job

  • working_dir (str) – path to directory to run the job in

  • script (str) – path to command or script file to run

  • args (list) – list of arguments to supply to the script

Returns:

Job id for submitted job, or ‘None’ if job failed to start.

property slurm_extra_args

Return the extra Slurm arguments

terminate(job_id, exit_code=-1)

Remove a job from the Slurm queue using ‘scancel’

bcftbx.JobRunner.fetch_runner(definition)

Return job runner instance based on a definition string

Given a definition string, returns an appropriate runner instance.

Definitions are of the form:

RunnerName[(args)]

RunnerName can be ‘SimpleJobRunner’, ‘GEJobRunner’ or ‘SlurmRunner’. If ‘(args)’ are also supplied then:

  • for SimpleJobRunners, this can be a list of optional arguments separated by spaces:

    • ‘nslots=N’ (where N is an integer; sets a non-default number of slots

    • ‘join_logs=BOOLEAN’ (where BOOLEAN can be ‘True’, ‘true’,’y’,’False’,’false’,’n’; sets whether stdout and stderr should be written to the same file)

  • for GEJobRunners, this is a set of arbitrary ‘qsub’ options that will be used on job submission

  • for SlurmRunners, this can be a list of optional arguments separated by spaces:

    • ‘nslots=N’ (where N is an integer; sets a non-default number of slots

    • ‘partition=STRING’ (where STRING is the name of the target Slurm partition)

    • ‘join_logs=BOOLEAN’ (where BOOLEAN can be ‘True’, ‘true’,’y’,’False’,’false’,’n’; sets whether stdout and stderr should be written to the same file)

    • a sting with arbitrary ‘sbatch’ options that will be included on job submission (note: ‘-J’, ‘-o’, ‘-e’ and ‘–export’ cannot be specified)