Source code for

import abc
import functools
import typing
from pathlib import Path

import numpy as np

from .. import config, plugins
from .._util import immutable, obj_str_insert
from ..trees import BoxTree
from ._chunks import Chunk

if typing.TYPE_CHECKING:
    from ..cell_types import CellType

[docs] @config.pluggable(key="engine", plugin_name="storage engine") class StorageNode: root: typing.Any = config.slot() @classmethod def __plugins__(cls): if not hasattr(cls, "_plugins"): cls._plugins = { name: plugin.StorageNode for name, plugin in"storage.engines").items() } return cls._plugins
[docs] class Interface(abc.ABC): _iface_engine_key = None def __init__(self, engine): self._engine = engine def __init_subclass__(cls, **kwargs): # Only change engine key if explicitly given. if "engine_key" in kwargs: cls._iface_engine_key = kwargs["engine_key"]
[docs] class NoopLock: def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): pass
[docs] class Engine(Interface): """ Engines perform the transactions that come from the storage object, and read/write data in a specific format. They can perform collective or individual actions. .. warning:: Collective actions can only be performed from all nodes, or deadlocks occur. This means in particular that they may not be called from component code. """ def __init__(self, root, comm): self._root = comm.bcast(root, root=0) self._comm = comm self._readonly = False def __eq__(self, other): eq_format = self._format == getattr(other, "_format", None) eq_root = self._root == getattr(other, "_root", None) return eq_format and eq_root @property def root(self): """ The unique identifier for the storage. Usually pathlike, but can be anything. """ return self._root @property def comm(self): """ The communicator in charge of collective operations. """ return self._comm
[docs] def set_comm(self, comm): """ :guilabel:`collective` Set a new communicator in charge of collective operations. """ self._comm = comm
@property def format(self): """ Name of the type of engine. Automatically set through the plugin system. """ return self._format @property @abc.abstractmethod def versions(self): """ Must return a dictionary containing the version of the engine package, and bsb package, used to last write to this storage object. """ pass @property @abc.abstractmethod def root_slug(self): """ Must return a pathlike unique identifier for the root of the storage object. """ pass
[docs] @abc.abstractmethod def recognizes(self, root): """ Must return whether the given argument is recognized as a valid storage object. """ pass
[docs] @classmethod def peek_exists(cls, root): """ Must peek at the existence of the given root, without instantiating anything. """ try: return Path(root).exists() except Exception: return False
[docs] @abc.abstractmethod def exists(self): """ Must check existence of the storage object. """ pass
[docs] @abc.abstractmethod def create(self): """ :guilabel:`collective` Must create the storage engine. """ pass
[docs] @abc.abstractmethod def move(self, new_root): """ :guilabel:`collective` Must move the storage object to the new root. """ pass
[docs] @abc.abstractmethod def copy(self, new_root): """ :guilabel:`collective` Must copy the storage object to the new root. """ pass
[docs] @abc.abstractmethod def remove(self): """ :guilabel:`collective` Must remove the storage object. """ pass
[docs] @abc.abstractmethod def clear_placement(self): """ :guilabel:`collective` Must clear existing placement data. """ pass
[docs] @abc.abstractmethod def clear_connectivity(self): """ :guilabel:`collective` Must clear existing connectivity data. """ pass
[docs] @abc.abstractmethod def get_chunk_stats(self): """ :guilabel:`readonly` Must return a dictionary with all chunk statistics. """ pass
[docs] def read_only(self): """ A context manager that enters the engine into readonly mode. In readonly mode the engine does not perform any locking, write-operations or network synchronization, and errors out if a write operation is attempted. """ self._readonly = True return ReadOnlyManager(self)
[docs] def readwrite(self): self._readonly = False
[docs] class ReadOnlyManager: def __init__(self, engine): self._e = engine def __enter__(self): self._e._readonly = True def __exit__(self, *args): self._e._readonly = False
[docs] class NetworkDescription(Interface): pass
[docs] class FileStore(Interface, engine_key="files"): """ Interface for the storage and retrieval of files essential to the network description. """
[docs] @abc.abstractmethod def all(self): """ Return all ids and associated metadata in the file store. """ pass
[docs] @abc.abstractmethod def store(self, content, id=None, meta=None, encoding=None, overwrite=False): """ Store content in the file store. Should also store the current timestamp as `mtime` meta. :param content: Content to be stored :type content: str :param id: Optional specific id for the content to be stored under. :type id: str :param meta: Metadata for the content :type meta: dict :param encoding: Optional encoding :type encoding: str :param overwrite: Overwrite existing file :type overwrite: bool :returns: The id the content was stored under :rtype: str """ pass
[docs] @abc.abstractmethod def load(self, id): """ Load the content of an object in the file store. :param id: id of the content to be loaded. :type id: str :returns: The content of the stored object :rtype: str :raises FileNotFoundError: The given id doesn't exist in the file store. """ pass
[docs] @abc.abstractmethod def remove(self, id): """ Remove the content of an object in the file store. :param id: id of the content to be removed. :type id: str :raises FileNotFoundError: The given id doesn't exist in the file store. """ pass
[docs] @abc.abstractmethod def store_active_config(self, config): """ Store configuration in the file store and mark it as the active configuration of the stored network. :param config: Configuration to be stored :type config: :class:`~.config.Configuration` :returns: The id the config was stored under :rtype: str """ pass
[docs] @abc.abstractmethod def load_active_config(self): """ Load the active configuration stored in the file store. :returns: The active configuration :rtype: :class:`~.config.Configuration` :raises Exception: When there's no active configuration in the file store. """ pass
[docs] @abc.abstractmethod def has(self, id): """ Must return whether the file store has a file with the given id. """ pass
[docs] @abc.abstractmethod def get_mtime(self, id): """ Must return the last modified timestamp of file with the given id. """ pass
[docs] @abc.abstractmethod def get_encoding(self, id): """ Must return the encoding of the file with the given id, or None if it is unspecified binary data. """ pass
[docs] @abc.abstractmethod def get_meta(self, id) -> typing.Mapping[str, typing.Any]: """ Must return the metadata of the given id. """ pass
[docs] def get(self, id) -> "StoredFile": """ Return a StoredFile wrapper """ if not self.has(id): raise FileNotFoundError(f"File with id '{id}' not found.") return StoredFile(self, id)
[docs] def find_files(self, predicate): return ( StoredFile(self, id_) for id_, m in self.all().items() if predicate(id_, m) )
[docs] def find_file(self, predicate): return next(self.find_files(predicate), None)
[docs] def find_id(self, id): return self.find_file(lambda id_, _: id_ == id)
[docs] def find_meta(self, key, value): return self.find_file(lambda _, meta: meta.get(key, None) == value)
[docs] class StoredFile: def __init__(self, store, id): = store = id @property def meta(self): return @property def mtime(self): return
[docs] def load(self): return
[docs] class PlacementSet(Interface): """ Interface for the storage of placement data of a cell type. """ @abc.abstractmethod def __init__(self, engine, cell_type): self._engine = engine self._type = cell_type self._tag = @abc.abstractmethod def __len__(self): pass @obj_str_insert def __repr__(self): cell_type = self.cell_type try: ms = self.load_morphologies() except Exception: return f"cell type: '{}'" if not len(ms): mstr = "without morphologies" else: mstr = f"with {len(ms._loaders)} morphologies" return f"cell type: '{}', {len(self)} cells, {mstr}" @property def cell_type(self): """ The associated cell type. :returns: The cell type :rtype: ~bsb.cell_types.CellType """ return self._type @property def tag(self): """ The unique identifier of the placement set. :returns: Unique identifier :rtype: str """ return self._tag
[docs] @classmethod @abc.abstractmethod def create(cls, engine, cell_type): """ Create a placement set. :param engine: The engine that governs this PlacementSet. :type engine: `` :param cell_type: The cell type whose data is stored in the placement set. :type cell_type: bsb.cell_types.CellType :returns: A placement set :rtype: """ pass
[docs] @staticmethod @abc.abstractmethod def exists(engine, cell_type): """ Check existence of a placement set. :param engine: The engine that governs the existence check. :type engine: `` :param cell_type: The cell type to look for. :type cell_type: bsb.cell_types.CellType :returns: Whether the placement set exists. :rtype: bool """ pass
[docs] @classmethod def require(cls, engine, type): """ Return and create a placement set, if it didn't exist before. The default implementation uses the :meth:`` and :meth:`` methods. :param engine: The engine that governs this PlacementSet. :type engine: `` :param cell_type: The cell type whose data is stored in the placement set. :type cell_type: bsb.cell_types.CellType :returns: A placement set :rtype: """ if not cls.exists(engine, type): cls.create(engine, type) return cls(engine, type)
[docs] @abc.abstractmethod def clear(self, chunks=None): """ Clear (some chunks of) the placement set. :param chunks: If given, the specific chunks to clear. :type chunks: List[] """ pass
[docs] @abc.abstractmethod def get_all_chunks(self): """ Get all the chunks that exist in the placement set. :returns: List of existing chunks. :rtype: List[] """ pass
[docs] @abc.abstractmethod def load_ids(self): pass
[docs] @abc.abstractmethod def load_positions(self): """ Return a dataset of cell positions. :returns: An (Nx3) dataset of positions. :rtype: numpy.ndarray """ pass
[docs] @abc.abstractmethod def load_rotations(self): """ Load the rotation data of the placement set :returns: A rotation set :rtype: ~bsb.morphologies.RotationSet """ pass
[docs] @abc.abstractmethod def load_morphologies(self, allow_empty=False): """ Return a :class:`~.morphologies.MorphologySet` associated to the cells. Raises an error if there is no morphology data, unless `allow_empty=True`. :param bool allow_empty: Silence missing morphology data error, and return an empty morphology set. :returns: Set of morphologies :rtype: :class:`~.morphologies.MorphologySet` """ pass
[docs] @abc.abstractmethod def load_additional(self, key=None): pass
[docs] def count_morphologies(self): """ Must return the number of different morphologies used in the set. """ return self.load_morphologies(allow_empty=True).count_morphologies()
@abc.abstractmethod def __iter__(self): pass @abc.abstractmethod def __len__(self): pass
[docs] @abc.abstractmethod def append_data( self, chunk, positions=None, morphologies=None, rotations=None, additional=None, count=None, ): """ Append data to the placement set. If any of ``positions``, ``morphologies``, or ``rotations`` is given, the arguments to its left must also be given (e.g. passing morphologies, but no positions, is not allowed, passing just positions is allowed) :param chunk: The chunk to store data in. :type chunk: :param positions: Cell positions :type positions: numpy.ndarray :param rotations: Cell rotations :type rotations: ~bsb.morphologies.RotationSet :param morphologies: Cell morphologies :type morphologies: ~bsb.morphologies.MorphologySet :param additional: Additional datasets with 1 value per cell, will be stored under its key in the dictionary :type additional: Dict[str, numpy.ndarray] :param count: Amount of entities to place. Excludes the use of any positional, rotational or morphological data. :type count: int """ pass
[docs] @abc.abstractmethod def append_additional(self, name, chunk, data): """ Append arbitrary user data to the placement set. The length of the data must match that of the placement set, and must be storable by the engine. :param name: :param chunk: The chunk to store data in. :type chunk: :param data: Arbitrary user data. You decide |:heart:| :type data: numpy.ndarray """ pass
[docs] @abc.abstractmethod def chunk_context(self, chunks): pass
[docs] @abc.abstractmethod def set_chunk_filter(self, chunks): """ Should limit the scope of the placement set to the given chunks. :param chunks: List of chunks :type chunks: list[] """ pass
[docs] @abc.abstractmethod def set_label_filter(self, labels): """ Should limit the scope of the placement set to the given labels. :param labels: List of labels :type labels: list[str] """ pass
[docs] @abc.abstractmethod def set_morphology_label_filter(self, morphology_labels): """ Should limit the scope of the placement set to the given sub-cellular labels. The morphologies returned by :meth:`` should return a filtered form of themselves if :meth:`~.morphologies.Morphology.as_filtered` is called on them. :param morphology_labels: List of labels :type morphology_labels: list[str] """ pass
[docs] @abc.abstractmethod def label(self, labels, cells): """ Should label the cells with given labels. :param cells: Array of cells in this set to label. :type cells: numpy.ndarray :param labels: List of labels :type labels: list[str] """ pass
[docs] @abc.abstractmethod def get_labelled(self, labels): """ Should return the cells labelled with given labels. :param cells: Array of cells in this set to label. :type cells: numpy.ndarray :param labels: List of labels :type labels: list[str] """ pass
[docs] @abc.abstractmethod def get_label_mask(self, labels): """ Should return a mask that fits the placement set for the cells with given labels. :param cells: Array of cells in this set to label. :type cells: numpy.ndarray :param labels: List of labels :type labels: list[str] """ pass
[docs] @abc.abstractmethod def get_chunk_stats(self): """ Should return how many cells were placed in each chunk. """ pass
[docs] def load_boxes(self, morpho_cache=None): """ Load the cells as axis aligned bounding box rhomboids matching the extension, orientation and position in space. This function loads morphologies, unless a `morpho_cache` is given, then that is used. :param morpho_cache: If you've previously loaded morphologies with soft or hard caching enabled, you can pass the resulting morphology set here to reuse it. If afterwards you need the morphology set, you best call :meth:`.load_morphologies` first and reuse it here. :type morpho_cache: ~bsb.morphologies.MorphologySet :returns: An iterator with 6 coordinates per cell: 3 min and 3 max coords, the bounding box of that cell's translated and rotated morphology. :rtype: Iterator[Tuple[float, float, float, float, float, float]] :raises: DatasetNotFoundError if no morphologies are found. """ if morpho_cache is None: mset = self.load_morphologies() else: mset = morpho_cache expansion = [*zip([0] * 4 + [1] * 4, ([0] * 2 + [1] * 2) * 2, [0, 1] * 4)] def _box_of(m, o, r): oo = (m["ldc"], m["mdc"]) # Make the 8 corners of the box corners = np.array([[oo[x][0], oo[y][1], oo[z][2]] for x, y, z in expansion]) # Rotate them rotbox = r.apply(corners) # Find outer box, by rotating and translating the starting box return np.concatenate( (np.min(rotbox, axis=0) + o, np.max(rotbox, axis=0) + o) ) iters = (mset.iter_meta(), self.load_positions(), self.load_rotations()) return map(_box_of, *iters)
[docs] def load_box_tree(self, morpho_cache=None): """ Load boxes, and form an RTree with them, for fast spatial lookup of rhomboid intersection. :param morpho_cache: See :meth:``. :returns: A boxtree :rtype: bsb.trees.BoxTree """ return BoxTree(list(self.load_boxes(morpho_cache=morpho_cache)))
def _requires_morpho_mapping(self): return self._morphology_labels is not None and self.count_morphologies() def _morpho_backmap(self, locs): locs = locs.copy() cols = locs[:, 1:] ign_b = cols[:, 0] == -1 ign_p = cols[:, 1] == -1 semi = ign_b != ign_p if np.any(semi): raise ValueError( f"Invalid data at {np.nonzero(semi)[0]}. -1 needs to occur in " "either none or both columns to make point neuron connections." ) to_map = ~ign_b if np.any(locs[to_map, 1:] < 0): raise ValueError( f"Invalid data at {np.nonzero(locs[to_map, 1:] < 0)[0]}, " "negative values are not valid morphology locations." ) locs[to_map] = self.load_morphologies()._mapback(locs[to_map]) return locs
[docs] class MorphologyRepository(Interface, engine_key="morphologies"):
[docs] @abc.abstractmethod def all(self): """ Fetch all of the stored morphologies. :returns: List of the stored morphologies. :rtype: List[] """ pass
[docs] @abc.abstractmethod def select(self, *selectors): """ Select stored morphologies. :param selectors: Any number of morphology selectors. :type selectors: List[bsb.morphologies.selector.MorphologySelector] :returns: All stored morphologies that match at least one selector. :rtype: List[] """ pass
[docs] @abc.abstractmethod def save(self, name, morphology, overwrite=False): """ Store a morphology :param name: Key to store the morphology under. :type name: str :param morphology: Morphology to store :type morphology: bsb.morphologies.Morphology :param overwrite: Overwrite any stored morphology that already exists under that name :type overwrite: bool :returns: The stored morphology :rtype: """ pass
[docs] @abc.abstractmethod def has(self, name): """ Check whether a morphology under the given name exists :param name: Key of the stored morphology. :type name: str :returns: Whether the key exists in the repo. :rtype: bool """ pass
def __contains__(self, item): return self.has(item)
[docs] @abc.abstractmethod def preload(self, name): """ Load a stored morphology as a morphology loader. :param name: Key of the stored morphology. :type name: str :returns: The stored morphology :rtype: """ pass
[docs] @abc.abstractmethod def load(self, name): """ Load a stored morphology as a constructed morphology object. :param name: Key of the stored morphology. :type name: str :returns: A morphology :rtype: ~bsb.morphologies.Morphology """ pass
[docs] @abc.abstractmethod def get_meta(self, name): """ Get the metadata of a stored morphology. :param name: Key of the stored morphology. :type name: str :returns: Metadata dictionary :rtype: dict """ pass
[docs] @abc.abstractmethod def get_all_meta(self): """ Get the metadata of all stored morphologies. :returns: Metadata dictionary :rtype: dict """ pass
[docs] @abc.abstractmethod def set_all_meta(self, all_meta): """ Set the metadata of all stored morphologies. :param all_meta: Metadata dictionary. :type all_meta: dict """ pass
[docs] @abc.abstractmethod def update_all_meta(self, meta): """ Update the metadata of stored morphologies with the provided key values :param meta: Metadata dictionary. :type meta: str """ pass
[docs] def list(self): """ List all the names of the morphologies in the repository. """ return [ for loader in self.all()]
[docs] class ConnectivitySet(Interface): """ Stores the connections between 2 types of cell as ``local`` and ``global`` locations. A location is a cell id, referring to the n-th cell in the chunk, a branch id, and a point id, to specify the location on the morphology. Local locations refer to cells on this chunk, while global locations can come from any chunk and is associated to a certain chunk id as well. Locations are either placement-context or chunk dependent: You may form connections between the n-th cells of a placement set (using :meth:``), or of the n-th cells of 2 chunks (using :meth:``). A cell has both incoming and outgoing connections; when speaking of incoming connections, the local locations are the postsynaptic cells, and when speaking of outgoing connections they are the presynaptic cells. Vice versa for the global connections. """ # The following attributes must be set on each ConnectivitySet by the engine: tag: str pre_type_name: str post_type_name: str pre_type: "CellType" post_type: "CellType" @abc.abstractmethod def __len__(self): pass
[docs] @classmethod @abc.abstractmethod def create(cls, engine, tag): """ Must create the placement set. """ pass
@obj_str_insert def __repr__(self): cstr = f"with {len(self)} connections" if len(self) else "without connections" return f"'{self.tag}' {cstr}"
[docs] @staticmethod @abc.abstractmethod def exists(engine, tag): """ Must check the existence of the connectivity set """ pass
[docs] def require(self, engine, tag): """ Must make sure the connectivity set exists. The default implementation uses the class's ``exists`` and ``create`` methods. """ if not self.exists(engine, tag): self.create(engine, tag)
[docs] @abc.abstractmethod def clear(self, chunks=None): """ Must clear (some chunks of) the placement set """ pass
[docs] @classmethod @abc.abstractmethod def get_tags(cls, engine): """ Must return the tags of all existing connectivity sets. :param engine: Storage engine to inspect. """ pass
[docs] @abc.abstractmethod def connect(self, pre_set, post_set, src_locs, dest_locs): """ Must connect the ``src_locs`` to the ``dest_locs``, interpreting the cell ids (first column of the locs) as the cell rank in the placement set. """ pass
[docs] @abc.abstractmethod def chunk_connect(self, src_chunk, dst_chunk, src_locs, dst_locs): """ Must connect the ``src_locs`` to the ``dest_locs``, interpreting the cell ids (first column of the locs) as the cell rank in the chunk. """ pass
[docs] @abc.abstractmethod def get_local_chunks(self, direction): """ Must list all the local chunks that contain data in the given ``direction`` (``"inc"`` or ``"out"``). """ pass
[docs] @abc.abstractmethod def get_global_chunks(self, direction, local_): """ Must list all the global chunks that contain data coming from a ``local`` chunk in the given ``direction`` """ pass
[docs] @abc.abstractmethod def nested_iter_connections(self, direction=None, local_=None, global_=None): """ Must iterate over the connectivity data, leaving room for the end-user to set up nested for loops: .. code-block:: python for dir, itr in self.nested_iter_connections(): for lchunk, itr in itr: for gchunk, data in itr: print(f"Nested {dir} block between {lchunk} and {gchunk}") If a keyword argument is given, that axis is not iterated over, and the amount of nested loops is reduced. """ pass
[docs] @abc.abstractmethod def flat_iter_connections(self, direction=None, local_=None, global_=None): """ Must iterate over the connectivity data, yielding the direction, local chunk, global chunk, and data: .. code-block:: python for dir, lchunk, gchunk, data in self.flat_iter_connections(): print(f"Flat {dir} block between {lchunk} and {gchunk}") If a keyword argument is given, that axis is not iterated over, and the value is fixed in each iteration. """ pass
[docs] @abc.abstractmethod def load_block_connections(self, direction, local_, global_): """ Must load the connections from ``direction`` perspective between ``local_`` and ``global_``. :returns: The local and global connections locations :rtype: Tuple[numpy.ndarray, numpy.ndarray] """ pass
[docs] @abc.abstractmethod def load_local_connections(self, direction, local_): """ Must load all the connections from ``direction`` perspective in ``local_``. :returns: The local connection locations, a vector of the global connection chunks (1 chunk id per connection), and the global connections locations. To identify a cell in the global connections, use the corresponding chunk id from the second return value. :rtype: Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] """ pass
[docs] def load_connections(self): """ Loads connections as a ``CSIterator``. :returns: A connectivity set iterator, that will load data """ return ConnectivityIterator(self, "out")
[docs] class ConnectivityIterator: def __init__( self, cs: ConnectivitySet, direction, lchunks=None, gchunks=None, scoped=True ): self._cs = cs self._dir = direction self._lchunks = lchunks self._scoped = scoped self._gchunks = gchunks def __copy__(self): lchunks = self._lchunks.copy() if self._lchunks is not None else None gchunks = self._gchunks.copy() if self._gchunks is not None else None return ConnectivityIterator(self._cs, self._dir, lchunks, gchunks) def __len__(self): return len(self.all()[0]) def __iter__(self): """ Iterate over the connection locations chunk by chunk. :returns: The presynaptic location matrix and postsynaptic location matrix. :rtype: Tuple[numpy.ndarray, numpy.ndarray] """ for _, pre_locs, _, post_locs in self.chunk_iter(): yield from zip(pre_locs, post_locs)
[docs] def chunk_iter(self): """ Iterate over the connection data chunk by chunk. :returns: The presynaptic chunk, presynaptic locations, postsynaptic chunk, and postsynaptic locations. :rtype: Tuple[, numpy.ndarray,, numpy.ndarray] """ yield from ( self._offset_block(*data) for data in self._cs.flat_iter_connections( self._dir, self._lchunks, self._gchunks ) )
[docs] @immutable() def as_globals(self): self._scoped = False
[docs] @immutable() def as_scoped(self): self._scoped = True
[docs] @immutable() def outgoing(self): self._dir = "out" self._lchunks, self._gchunks = self._gchunks, self._lchunks
[docs] @immutable() def incoming(self): self._dir = "inc" self._lchunks, self._gchunks = self._gchunks, self._lchunks
[docs] @immutable() def to(self, chunks): if isinstance(chunks, Chunk) and chunks.ndim == 1: chunks = [chunks] if self._dir == "inc": self._lchunks = chunks else: self._gchunks = chunks
[docs] @immutable() def from_(self, chunks): if isinstance(chunks, Chunk) and chunks.ndim == 1: chunks = [chunks] if self._dir == "out": self._lchunks = chunks else: self._gchunks = chunks
[docs] def all(self): pre_blocks = [] post_blocks = [] lens = [] for _, pre_block, _, post_block in self.chunk_iter(): pre_blocks.append(pre_block) post_blocks.append(post_block) lens.append(len(pre_block)) pre_locs = np.empty((sum(lens), 3), dtype=int) post_locs = np.empty((sum(lens), 3), dtype=int) ptr = 0 for len_, pre_block, post_block in zip(lens, pre_blocks, post_blocks): pre_locs[ptr : ptr + len_] = pre_block post_locs[ptr : ptr + len_] = post_block ptr += len_ return pre_locs, post_locs
def _offset_block(self, direction: str, lchunk, gchunk, data): loff = self._local_chunk_offsets() goff = self._global_chunk_offsets() llocs, glocs = data llocs[:, 0] += loff[lchunk] glocs[:, 0] += goff[gchunk] if direction == "out": return lchunk, llocs, gchunk, glocs else: return gchunk, glocs, lchunk, llocs @functools.cache def _local_chunk_offsets(self): source = self._cs.post_type if self._dir == "inc" else self._cs.pre_type return self._chunk_offsets(source, self._lchunks) @functools.cache def _global_chunk_offsets(self): source = self._cs.pre_type if self._dir == "inc" else self._cs.post_type return self._chunk_offsets(source, self._gchunks) def _chunk_offsets(self, source, chunks): stats = source.get_placement_set().get_chunk_stats() if self._scoped and chunks is not None: stats = {chunk: item for chunk, item in stats.items() if int(chunk) in chunks} offsets = {} ctr = 0 for chunk, len_ in sorted( stats.items(), key=lambda k: Chunk.from_id(int(k[0]), None).id ): offsets[Chunk.from_id(int(chunk), None)] = ctr ctr += len_ return offsets
[docs] class StoredMorphology: def __init__(self, name, loader, meta): = name self._loader = loader self._meta = meta def __eq__(self, other): return == def __hash__(self): return hash(
[docs] def get_meta(self): return self._meta.copy()
[docs] def load(self): return self._loader()
[docs] def cached_load(self, labels=None): if labels is not None: labels = tuple(labels) return self._cached_load(labels)
@functools.cache def _cached_load(self, labels): return self.load().set_label_filter(labels).as_filtered()
[docs] class GeneratedMorphology(StoredMorphology): def __init__(self, name, generated, meta): super().__init__(name, lambda: generated, meta)
__all__ = [ "ConnectivityIterator", "ConnectivitySet", "Engine", "FileStore", "GeneratedMorphology", "Interface", "MorphologyRepository", "NetworkDescription", "NoopLock", "PlacementSet", "ReadOnlyManager", "StorageNode", "StoredFile", "StoredMorphology", ]