bsb.services package#
Submodules#
bsb.services.mpi module#
- class bsb.services.mpi.MPIModule(module)[source]#
Bases:
MockModule
Module provider of the MPI interface.
- property COMM_WORLD#
bsb.services.mpilock module#
- class bsb.services.mpilock.MPILockModule(module)[source]#
Bases:
MockModule
bsb.services.pool module#
Job pooling module.
Jobs derive from the base Job
class which can be put on the queue of a
JobPool
. In order to submit themselves to the pool Jobs will
serialize()
themselves into a predefined set of variables:
job.serialize() -> (job_type, f, args, kwargs)
job_type
should be a string that is a class name defined in this module.(e.g.
"PlacementJob"
)
f
should be the function object that the job’sexecute
method shouldexecute.
args
andkwargs
are the args to be passed to thatf
.
The Job.execute()
handler can help interpret args
and kwargs
before running f
. The execute handler has access to the scaffold on the MPI
process so one best serializes just the name of some part of the configuration,
rather than trying to pickle the complex objects. For example, the
PlacementJob
uses the first args
element to store the
PlacementStrategy
name and then retrieve it from the
scaffold:
@staticmethod
def execute(job_owner, f, args, kwargs):
placement = job_owner.placement[args[0]]
indicators = placement.get_indicators()
return f(placement, *args[1:], indicators, **kwargs)
A job has a couple of display variables that can be set: _cname
for the
class name, _name
for the job name and _c
for the chunk. These are used
to display what the workers are doing during parallel execution. This is an experimental
API and subject to sudden change in the future.
- class bsb.services.pool.ConnectivityJob(pool, strategy, pre_roi, post_roi, deps=None)[source]#
Dispatches the execution of a chunk of a connectivity strategy through a JobPool.
- class bsb.services.pool.Job(pool, submission_context: SubmissionContext, args, kwargs, deps=None)[source]#
Dispatches the execution of a function through a JobPool
- property context#
- property description#
- property error#
- property name#
- property result#
- run(timeout=None)[source]#
Execute the job on the current process, in a thread, and return whether the job is still running.
- property status#
- property submitter#
- class bsb.services.pool.JobPool(scaffold, fail_fast=False, workflow: Workflow | None = None)[source]#
-
- add_notification(notification: PoolProgress)[source]#
- change_status(status: PoolStatus)[source]#
- execute(return_results=False)[source]#
Execute the jobs in the queue
In serial execution this runs all of the jobs in the queue in First In First Out order. In parallel execution this enqueues all jobs into the MPIPool unless they have dependencies that need to complete first.
- property owner#
- property parallel#
- property scheduling#
- property status#
- property workflow#
- class bsb.services.pool.JobStatus(value)[source]#
An enumeration.
- ABORTED = 'aborted'#
- CANCELLED = 'cancelled'#
- FAILED = 'failed'#
- PENDING = 'pending'#
- QUEUED = 'queued'#
- RUNNING = 'running'#
- SUCCESS = 'success'#
- class bsb.services.pool.PlacementJob(pool, strategy, chunk, deps=None)[source]#
Dispatches the execution of a chunk of a placement strategy through a JobPool.
- class bsb.services.pool.PoolJobUpdateProgress(pool: JobPool, job: Job, old_status: JobStatus)[source]#
- property job#
- property old_status#
- property status#
- class bsb.services.pool.PoolProgress(pool: JobPool, reason: PoolProgressReason)[source]#
Class used to report pool progression to listeners.
- property jobs#
- property reason#
- property status#
- property workflow#
- class bsb.services.pool.PoolProgressReason(value)[source]#
An enumeration.
- JOB_ADDED = 2#
- JOB_STATUS_CHANGE = 3#
- MAX_TIMEOUT_PING = 4#
- POOL_STATUS_CHANGE = 1#
- class bsb.services.pool.PoolStatus(value)[source]#
An enumeration.
- CLOSING = 'closing'#
- EXECUTING = 'executing'#
- SCHEDULING = 'scheduling'#
- class bsb.services.pool.PoolStatusProgress(pool: JobPool, old_status: PoolStatus)[source]#
- class bsb.services.pool.SubmissionContext(submitter, chunks=None, **kwargs)[source]#
Context information on who submitted a certain job.
- property chunks#
- property context#
- property name#
- property submitter#
- class bsb.services.pool.Workflow(phases: list[str])[source]#
- property finished#
- property phase#
- property phases#
- exception bsb.services.pool.WorkflowError(_ExceptionGroup__message: str, _ExceptionGroup__exceptions: Sequence[_ExceptionT_co])[source]#
Developer modules#
- class bsb.services._util._ExceptionT_co#