bsb.services package#

Submodules#

bsb.services.mpi module#

class bsb.services.mpi.MPIModule(module)[source]#

Bases: MockModule

Module provider of the MPI interface.

property COMM_WORLD#
class bsb.services.mpi.MPIService[source]#

Bases: object

abort(errorcode=1)[source]#
allgather(obj)[source]#
barrier()[source]#
bcast(obj, root=0)[source]#
gather(obj, root=0)[source]#
get_communicator()[source]#
get_rank()[source]#
get_size()[source]#

bsb.services.mpilock module#

class bsb.services.mpilock.Fence(access)[source]#

Bases: object

collect()[source]#
guard()[source]#
share(obj)[source]#
exception bsb.services.mpilock.FencedSignal[source]#

Bases: Exception

class bsb.services.mpilock.MPILockModule(module)[source]#

Bases: MockModule

sync(comm=None, master=0)[source]#
class bsb.services.mpilock.MockedWindowController(comm=None, master=0)[source]#

Bases: object

close()[source]#
property closed#
property master#
property rank#
read()[source]#
single_write(handle=None, rank=None)[source]#
write()[source]#

bsb.services.pool module#

Job pooling module.

Jobs derive from the base Job class which can be put on the queue of a JobPool. In order to submit themselves to the pool Jobs will serialize() themselves into a predefined set of variables:

job.serialize() -> (job_type, f, args, kwargs)
  • job_type should be a string that is a class name defined in this module.

    (e.g. "PlacementJob")

  • f should be the function object that the job’s execute method should

    execute.

  • args and kwargs are the args to be passed to that f.

The Job.execute() handler can help interpret args and kwargs before running f. The execute handler has access to the scaffold on the MPI process so one best serializes just the name of some part of the configuration, rather than trying to pickle the complex objects. For example, the PlacementJob uses the first args element to store the PlacementStrategy name and then retrieve it from the scaffold:

@staticmethod
def execute(job_owner, f, args, kwargs):
    placement = job_owner.placement[args[0]]
    indicators = placement.get_indicators()
    return f(placement, *args[1:], indicators, **kwargs)

A job has a couple of display variables that can be set: _cname for the class name, _name for the job name and _c for the chunk. These are used to display what the workers are doing during parallel execution. This is an experimental API and subject to sudden change in the future.

class bsb.services.pool.ConnectivityJob(pool, strategy, pre_roi, post_roi, deps=None)[source]#

Dispatches the execution of a chunk of a connectivity strategy through a JobPool.

static execute(job_owner, args, kwargs)[source]#

Job handler

class bsb.services.pool.FunctionJob(pool, f, args, kwargs, deps=None, **context)[source]#
static execute(job_owner, args, kwargs)[source]#

Job handler

class bsb.services.pool.Job(pool, submission_context: SubmissionContext, args, kwargs, deps=None)[source]#

Dispatches the execution of a function through a JobPool

cancel(reason: str | None = None)[source]#
change_status(status: JobStatus)[source]#
property context#
property description#
property error#
abstract static execute(job_owner, args, kwargs)[source]#

Job handler

property name#
on_completion(cb)[source]#
property result#
run(timeout=None)[source]#

Execute the job on the current process, in a thread, and return whether the job is still running.

serialize()[source]#

Convert the job to a (de)serializable representation

set_exception(e: Exception)[source]#
set_result(value)[source]#
property status#
property submitter#
exception bsb.services.pool.JobErroredError(message, error)[source]#
class bsb.services.pool.JobPool(scaffold, fail_fast=False, workflow: Workflow | None = None)[source]#
add_listener(listener, max_wait=None)[source]#
add_notification(notification: PoolProgress)[source]#
change_status(status: PoolStatus)[source]#
execute(return_results=False)[source]#

Execute the jobs in the queue

In serial execution this runs all of the jobs in the queue in First In First Out order. In parallel execution this enqueues all jobs into the MPIPool unless they have dependencies that need to complete first.

classmethod get_owner(id)[source]#
get_submissions_of(submitter)[source]#
classmethod get_tmp_folder(id)[source]#
is_main()[source]#
property jobs: list[Job]#
notify()[source]#
property owner#
property parallel#
ping()[source]#
queue(f, args=None, kwargs=None, deps=None, **context)[source]#
queue_connectivity(strategy, pre_roi, post_roi, deps=None)[source]#
queue_placement(strategy, chunk, deps=None)[source]#
raise_unhandled()[source]#
schedule(nodes, scheduler=None)[source]#
property scheduling#
property status#
property workflow#
class bsb.services.pool.JobStatus(value)[source]#

An enumeration.

ABORTED = 'aborted'#
CANCELLED = 'cancelled'#
FAILED = 'failed'#
PENDING = 'pending'#
QUEUED = 'queued'#
RUNNING = 'running'#
SUCCESS = 'success'#
class bsb.services.pool.PlacementJob(pool, strategy, chunk, deps=None)[source]#

Dispatches the execution of a chunk of a placement strategy through a JobPool.

static execute(job_owner, args, kwargs)[source]#

Job handler

class bsb.services.pool.PoolJobAddedProgress(pool: JobPool, job: Job)[source]#
property job#
class bsb.services.pool.PoolJobUpdateProgress(pool: JobPool, job: Job, old_status: JobStatus)[source]#
property job#
property old_status#
property status#
class bsb.services.pool.PoolProgress(pool: JobPool, reason: PoolProgressReason)[source]#

Class used to report pool progression to listeners.

property jobs#
property reason#
property status#
property workflow#
class bsb.services.pool.PoolProgressReason(value)[source]#

An enumeration.

JOB_ADDED = 2#
JOB_STATUS_CHANGE = 3#
MAX_TIMEOUT_PING = 4#
POOL_STATUS_CHANGE = 1#
class bsb.services.pool.PoolStatus(value)[source]#

An enumeration.

CLOSING = 'closing'#
EXECUTING = 'executing'#
SCHEDULING = 'scheduling'#
class bsb.services.pool.PoolStatusProgress(pool: JobPool, old_status: PoolStatus)[source]#
class bsb.services.pool.SubmissionContext(submitter, chunks=None, **kwargs)[source]#

Context information on who submitted a certain job.

property chunks#
property context#
property name#
property submitter#
class bsb.services.pool.Workflow(phases: list[str])[source]#
property finished#
next_phase()[source]#
property phase#
property phases#
exception bsb.services.pool.WorkflowError(_ExceptionGroup__message: str, _ExceptionGroup__exceptions: Sequence[_ExceptionT_co])[source]#
bsb.services.pool.dispatcher(pool_id, job_args)[source]#

Developer modules#

class bsb.services._util.ErrorModule(message)[source]#

Bases: object

class bsb.services._util.MockModule(module)[source]#

Bases: object

class bsb.services._util._ExceptionT_co#