"""
This module imports all supported storage engines, objects that read and write data,
which are present as subfolders of the `engine` folder, and provides them
transparently to the user, as a part of the :class:`Storage <.storage.Storage>`
factory class. The module scans the :mod:`.storage.interfaces` module for any class
that inherits from :class:`Interface <.storage.interfaces.Interface>` to collect all
Feature Interfaces and then scans the ``storage.engines.*`` submodules for any class
that provides an implementation of those features.
These features, because they all follow the same interface can then be passed on to
consumers and can be used independent of the underlying storage engine, which is the
end goal of this module.
"""
import functools
import typing
from inspect import isclass
from typing import Type
from .. import plugins
from ..exceptions import UnknownStorageEngineError
from ..services import MPI
if typing.TYPE_CHECKING:
from .interfaces import (
ConnectivitySet,
FileStore,
MorphologyRepository,
PlacementSet,
)
@functools.cache
def get_storage_interfaces():
from . import interfaces
# Collect all classes that are a subclass of Interface except Interface itself and
# store them in a {class_name: class_object} dictionary
return {
interface.__name__: interface
for interface in interfaces.__dict__.values()
if isclass(interface)
and issubclass(interface, interfaces.Interface)
and interface is not interfaces.Interface
}
[docs]
@functools.cache
def discover_engines():
"""
Get a dictionary of all available storage engines.
"""
return plugins.discover("storage.engines")
@functools.cache
def get_engine_support(engine_name):
try:
engine_module = discover_engines()[engine_name]
except KeyError:
raise UnknownStorageEngineError(
f"Unknown storage engine '{engine_name}'"
) from None
engine_support = {
interface_name: NotSupported(interface_name)
for interface_name in get_storage_interfaces().keys()
}
engine_support["StorageNode"] = engine_module.StorageNode
# Search for interface support
for interface_name, interface in get_storage_interfaces().items():
for module_item in engine_module.__dict__.values():
# Look through module items for child class of interface
if (
isclass(module_item)
and module_item is not interface
and issubclass(module_item, interface)
):
engine_support[interface_name] = module_item
break
return engine_support
[docs]
@functools.cache
def get_engines():
return {
name: get_engine_support(name)["Engine"] for name in discover_engines().keys()
}
[docs]
def create_engine(name, root, comm):
# Create an engine from the engine's Engine interface.
return get_engine_support(name)["Engine"](root, comm)
[docs]
class NotSupported:
"""
Utility class that throws a ``NotSupported`` error when it is used. This is the
default "implementation" of every storage feature that isn't provided by an engine.
"""
_iface_engine_key = None
def __init__(self, operation):
self.operation = operation
def _unsupported_err(self):
# Throw an error detailing the lack of support of our engine for our feature.
raise NotImplementedError(
f"The storage engine does not support the {self.operation} feature"
)
def __call__(self, *args, **kwargs):
self._unsupported_err()
def __getattr__(self, attr):
self._unsupported_err()
[docs]
class Storage:
"""
Factory class that produces all of the features and shims the functionality of the
underlying engine.
"""
_PlacementSet: Type["PlacementSet"]
_ConnectivitySet: Type["ConnectivitySet"]
_MorphologyRepository: Type["MorphologyRepository"]
_FileStore: Type["FileStore"]
def __init__(self, engine, root, comm=None, main=0, missing_ok=True):
"""
Create a Storage provider based on a specific `engine` uniquely identified
by the root object.
:param engine: The name of the storage engine.
:type engine: str
:param root: An object that uniquely describes the storage, such as a filename
or path. The value to be provided depends on the engine. For the hdf5 engine
the filename has to be provided.
:type root: object
:param comm: MPI communicator that shares control over this Storage.
:type comm: mpi4py.MPI.Comm
:param main: Rank of the MPI process that executes single-node tasks.
"""
self._comm = comm or MPI
self._engine = create_engine(engine, root, self._comm)
self._features = [
fname for fname, supported in view_support()[engine].items() if supported
]
self._engine._format = engine
self._main = main
# Load the engine's interface onto the object, this allows the end user to create
# features, but it is not advised. Usually the Storage object
# itself provides factory methods that should be used instead.
for name, interface in get_engine_support(engine).items():
if name == "StorageNode":
continue
self.__dict__["_" + name] = interface
# Interfaces can define an autobinding key so that singletons are available
# on the engine under that key.
key = interface._iface_engine_key
if key is not None:
if self.supports(name):
self._engine.__dict__[key] = interface(self._engine)
else:
self._engine.__dict__[key] = NotSupported(self._engine.format, name)
# The storage should be created at the root as soon as we initialize because
# features might immediatly require the basic structure to be present.
self._preexisted = self.exists()
if not (missing_ok or self._preexisted):
raise FileNotFoundError(f"`{engine}` storage at '{root}' does not exist.")
if not self._preexisted:
self.create()
def __eq__(self, other):
return self._engine == getattr(other, "_engine", None)
@property
def preexisted(self):
return self._preexisted
[docs]
def is_main_process(self):
return self._comm.get_rank() == self._main
@property
def morphologies(self):
return self._engine.morphologies
@property
def files(self):
return self._engine.files
@property
def root(self):
return self._engine.root
@property
def root_slug(self):
return self._engine.root_slug
@property
def format(self):
return self._engine._format
[docs]
def exists(self):
"""
Check whether the storage exists at the root.
"""
return self._engine.exists()
[docs]
def create(self):
"""
Create the minimal requirements at the root for other features to function and
for the existence check to pass.
"""
return self._engine.create()
[docs]
def copy(self, new_root):
"""
Move the storage to a new root.
"""
self._engine.copy(new_root)
[docs]
def move(self, new_root):
"""
Move the storage to a new root.
"""
self._engine.move(new_root)
[docs]
def remove(self):
"""
Remove the storage and all data contained within. This is an irreversible
destructive action!
"""
self._engine.remove()
[docs]
def load(self):
"""
Load a scaffold from the storage.
:returns: :class:`Scaffold <.core.Scaffold>`
"""
from ..core import Scaffold
return Scaffold(storage=self)
[docs]
def load_active_config(self):
"""
Load the configuration object from the storage.
:returns: :class:`Configuration <.config.Configuration>`
"""
return self._engine.files.load_active_config()
[docs]
def store_active_config(self, config):
"""
Store a configuration object in the storage.
"""
return self._engine.files.store_active_config(config)
[docs]
def supports(self, feature):
return feature in self._features
[docs]
def assert_support(self, feature):
if not self.supports(feature):
raise NotImplementedError(
"The '{}' engine lacks support for the '{}' feature.".format(
self._engine._format, feature
)
)
[docs]
def get_placement_set(self, type, chunks=None, labels=None, morphology_labels=None):
"""
Return a PlacementSet for the given type.
:param type: Specific cell type.
:type type: ~bsb.cell_types.CellType
:param chunks: Optionally load a specific list of chunks.
:type chunks: list[tuple[float, float, float]]
:param labels: Labels to filter the placement set by.
:type labels: list[str]
:returns: ~bsb.storage.interfaces.PlacementSet
"""
ps = self._PlacementSet(self._engine, type)
if chunks is not None:
ps.set_chunk_filter(chunks)
ps.set_label_filter(labels)
ps.set_morphology_label_filter(morphology_labels)
return ps
[docs]
def require_placement_set(self, cell_type):
"""
Get a placement set.
:param cell_type: Connection cell_type
:type cell_type: ~bsb.cell_types.CellType
:returns: ~bsb.storage.interfaces.PlacementSet
"""
return self._PlacementSet.require(self._engine, cell_type)
[docs]
def get_connectivity_set(self, tag):
"""
Get a connection set.
:param tag: Connection tag
:type tag: str
:returns: ~bsb.storage.interfaces.ConnectivitySet
"""
return self._ConnectivitySet(self._engine, tag)
[docs]
def require_connectivity_set(self, tag, pre=None, post=None):
"""
Get a connection set.
:param tag: Connection tag
:type tag: str
:returns: ~bsb.storage.interfaces.ConnectivitySet
"""
return self._ConnectivitySet.require(self._engine, tag, pre, post)
[docs]
def get_connectivity_sets(self):
"""
Return a ConnectivitySet for the given type.
:param type: Specific cell type.
:type type: ~bsb.cell_types.CellType
:returns: ~bsb.storage.interfaces.ConnectivitySet
"""
return [
self._ConnectivitySet(self._engine, tag)
for tag in self._ConnectivitySet.get_tags(self._engine)
]
[docs]
def init(self, scaffold):
"""
Initialize the storage to be ready for use by the specified scaffold.
"""
self.store_active_config(scaffold.configuration)
if self.supports("PlacementSet"):
self.init_placement(scaffold)
[docs]
def init_placement(self, scaffold):
for cell_type in scaffold.get_cell_types():
self.require_placement_set(cell_type)
[docs]
def renew(self, scaffold):
"""
Remove and recreate an empty storage container for a scaffold.
"""
self.remove()
self.create()
self.init(scaffold)
[docs]
def clear_placement(self, scaffold=None):
self._engine.clear_placement()
if scaffold is not None:
self.init_placement(scaffold)
[docs]
def clear_connectivity(self):
self._engine.clear_connectivity()
[docs]
def read_only(self):
return self._engine.read_only()
[docs]
def get_chunk_stats(self):
return self._engine.get_chunk_stats()
[docs]
def open_storage(root):
engines = get_engines()
for name, engine in engines.items():
if engine.peek_exists(root) and engine.recognizes(root):
return Storage(name, root, missing_ok=False)
else:
for name, engine in engines.items():
if engine.peek_exists(root):
raise IOError(
f"Storage `{root}` not recognized as any installed format: "
+ ", ".join(f"'{n}'" for n in engines.keys())
)
else:
raise FileNotFoundError(f"Storage `{root}` does not exist.")
[docs]
def get_engine_node(engine_name):
try:
return get_engine_support(engine_name)["StorageNode"]
except KeyError:
raise RuntimeError(
f"Broken storage engine plugin '{engine_name}' is missing a StorageNode."
)
[docs]
def view_support(engine=None):
"""
Return which storage engines support which features.
"""
if engine is None:
return {
# Loop over all enginges
engine_name: {
# Loop over all features, check whether they're supported
feature_name: not isinstance(feature, NotSupported)
for feature_name, feature in get_engine_support(engine_name).items()
}
for engine_name in discover_engines()
}
else:
# Loop over all features for the specific engine
return {
feature_name: not isinstance(feature, NotSupported)
for feature_name, feature in get_engine_support(engine).items()
}
__all__ = [
"NotSupported",
"Storage",
"create_engine",
"discover_engines",
"get_engine_node",
"get_engines",
"open_storage",
"view_support",
]