Source code for bsb.models

import numpy as np, random
from .morphologies import Morphology as BaseMorphology, NilCompartment
from .helpers import (
    ConfigurableClass,
    dimensions,
    origin,
    SortableByAfter,
    continuity_list,
    expand_continuity_list,
    count_continuity_list,
    iterate_continuity_list,
)
from .exceptions import *


[docs]class CellType(SortableByAfter): """ A CellType represents a population of cells. """ def __init__(self, name, placement=None): self.name = name self.placement = placement self.relay = False
[docs] def validate(self): """ Check whether this CellType is valid to be used in the simulation. """ pass
def initialise(self, scaffoldInstance): self.scaffold = scaffoldInstance self.validate()
[docs] def set_morphology(self, morphology): """ Set the Morphology class for this cell type. :param morphology: Defines the geometrical constraints for the axon and dendrites of the cell type. :type morphology: Instance of a subclass of scaffold.morphologies.Morphology """ if not issubclass(type(morphology), BaseMorphology): raise ClassError( "Only subclasses of scaffold.morphologies.Morphology can be used as cell morphologies." ) self.morphology = morphology
[docs] def set_placement(self, placement): """ Set the placement strategy for this cell type. """ self.placement = placement
[docs] def place(self): """ Place this cell type. """ self.scaffold.place_cell_type(self)
@classmethod def get_ordered(cls, objects): return sorted(objects.values(), key=lambda x: x.placement.get_placement_count()) def has_after(self): return hasattr(self.placement, "after") def get_after(self): return None if not self.has_after() else self.placement.after def create_after(self): self.placement.after = [] def get_placed_count(self): return self.scaffold.statistics.cells_placed[self.name] def _get_cached_ids(self): if self.entity: dataset = self.scaffold.entities_by_type[self.name] else: dataset = self.scaffold.cells_by_type[self.name][:, 0] return np.array(dataset, dtype=int) def _ser_cached_ids(self): raw_ids = self._get_cached_ids() return continuity_list(raw_ids) def get_cells(self): if self.entity: return self.scaffold.get_entities_by_type(self.name) return self.scaffold.get_cells_by_type(self.name)
[docs] def list_all_morphologies(self): """ Return a list of all the morphology identifiers that can represent this cell type in the simulation volume. """ if not hasattr(self, "morphology") or not hasattr( self.morphology, "detailed_morphologies" ): return [] morphology_config = self.morphology.detailed_morphologies # TODO: More selection mechanisms like tags if "names" in morphology_config: m_names = morphology_config["names"] return m_names.copy() else: raise NotImplementedError( "Detailed morphologies can currently only be selected by name." )
def get_placement_set(self): return self.scaffold.get_placement_set(self)
[docs]class Layer(dimensions, origin): """ A Layer represents a compartment of the topology of the simulation volume that slices the volume in horizontally stacked portions. """ def __init__(self, name, origin, dimensions, scaling=True): # Name of the layer self.name = name # The XYZ coordinates of the point at the center of the bottom plane of the layer. self.origin = np.array(origin) # Dimensions in the XYZ axes. self.dimensions = np.array(dimensions) self.volumeOccupied = 0.0 # Should this layer scale when the simulation volume is resized? self.scaling = scaling @property def available_volume(self): return self.volume - self.volumeOccupied @property def thickness(self): return self.height def allocateVolume(volume): self.volumeOccupied += volume def initialise(self, scaffoldInstance): self.scaffold = scaffoldInstance
[docs] def scale_to_reference(self): """ Compute scaled layer volume To compute layer thickness, we scale the current layer to the combined volume of the reference layers. A ratio between the dimension can be specified to alter the shape of the layer. By default equal ratios are used and a cubic layer is obtained (given by `dimension_ratios`). The volume of the current layer (= X*Y*Z) is scaled with respect to the volume of reference layers by a factor `volume_scale`, so: X*Y*Z = volume_reference_layers / volume_scale [A] Supposing that the current layer dimensions (X,Y,Z) are each one depending on the dimension Y according to `dimension_ratios`, we obtain: X*Y*Z = (Y*dimension_ratios[0] * Y * (Y*dimension_ratios[2]) [B] X*Y*Z = (Y^3) * prod(dimension_ratios) [C] Therefore putting together [A] and [C]: (Y^3) * prod(dimension_ratios) = volume_reference_layers / volume_scale from which we derive the normalized_size Y, according to the following formula: Y = cubic_root((volume_reference_layers * volume_scale) / prod(dimension_ratios)) """ volume_reference_layers = np.sum( list(map(lambda layer: layer.volume, self.reference_layers)) ) # Compute volume: see docstring. normalized_size = pow( volume_reference_layers * self.volume_scale / np.prod(self.dimension_ratios), 1 / 3, ) # Apply the normalized size with their ratios to each dimension self.dimensions = np.multiply( np.repeat(normalized_size, 3), self.dimension_ratios )
class Resource: def __init__(self, handler, path): self._handler = handler self._path = path def get_dataset(self, selector=(), dtype=None): with self._handler.load("r") as f: if not self._path in f(): raise DatasetNotFoundError( "Dataset '{}' not found in '{}'.".format( self._path, self._handler.file ) ) d = f()[self._path][selector] if dtype: d = d.astype(dtype) return d @property def attributes(self): with self._handler.load("r") as f: return dict(f()[self._path].attrs) def get_attribute(self, name): attrs = self.attributes if name not in attrs: raise AttributeMissingError( "Attribute '{}' not found in '{}'".format(name, self._path) ) return attrs[name] def exists(self): with self._handler.load("r") as f: return self._path in f() def unmap(self, selector=(), mapping=lambda m, x: m[x], data=None): if data is None: data = self.get_dataset(selector) map = self.get_attribute("map") unmapped = [] for record in data: unmapped.append(mapping(map, record)) return np.array(unmapped) def unmap_one(self, data, mapping=None): if mapping is None: return self.unmap(data=[data]) else: return self.unmap(data=[data], mapping=mapping) def __iter__(self): return iter(self.get_dataset()) @property def shape(self): with self._handler.load("r") as f: return f()[self._path].shape class Connection: def __init__( self, from_id, to_id, from_compartment=None, to_compartment=None, from_morphology=None, to_morphology=None, ): self.from_id = from_id self.to_id = to_id if ( from_compartment is not None or to_compartment is not None or from_morphology is not None or to_morphology is not None ): if from_compartment < -1: raise RuntimeError("Invalid compartment data") elif from_compartment == -1: self.from_compartment = NilCompartment() else: self.from_compartment = from_morphology.compartments[from_compartment] if to_compartment < -1: raise RuntimeError("Invalid compartment data") elif to_compartment == -1: self.to_compartment = NilCompartment() else: self.to_compartment = to_morphology.compartments[to_compartment]
[docs]class ConnectivitySet(Resource): """ Connectivity sets store connections. """ def __init__(self, handler, tag): super().__init__(handler, "/cells/connections/" + tag) if not self.exists(): raise DatasetNotFoundError("ConnectivitySet '{}' does not exist".format(tag)) self.scaffold = handler.scaffold self.tag = tag self.compartment_set = Resource(handler, "/cells/connection_compartments/" + tag) self.morphology_set = Resource(handler, "/cells/connection_morphologies/" + tag)
[docs] def has_compartment_data(self): """ Check if compartment data exists for this connectivity set. """ return self.compartment_set.exists()
def is_orphan(self): return not bool(self.attributes["connection_types"]) @property def connections(self): """ Return a list of :class:`Intersections <.models.Connection>`. Connections contain pre- & postsynaptic identifiers. """ return [Connection(c[0], c[1]) for c in self.get_dataset()] @property def from_identifiers(self): """ Return a list with the presynaptic identifier of each connection. """ return self.get_dataset(dtype=int)[:, 0] @property def to_identifiers(self): """ Return a list with the postsynaptic identifier of each connection. """ return self.get_dataset(dtype=int)[:, 1] @property def intersections(self): """ Return a list of :class:`Intersections <.models.Connection>`. Intersections contain pre- & postsynaptic identifiers and the intersecting compartments. """ return self.get_intersections() def get_intersections(self): intersections = [] morphos = {-1: None} def _cache_morpho(id): # Keep a cache of the morphologies so that all morphologies with the same # id refer to the same object, and so that they aren't redundandly loaded. id = int(id) if not id in morphos: name = self.morphology_set.unmap_one(id)[0] if isinstance(name, bytes): name = name.decode("UTF-8") morphos[id] = self.scaffold.morphology_repository.get_morphology(name) cells = self.get_dataset() if self.has_compartment_data(): comp_data = self.compartment_set.get_dataset() morpho_data = self.morphology_set.get_dataset() else: comp_data = np.ones(cells.shape) * -1 morpho_data = np.ones(cells.shape) * -1 for cell_ids, comp_ids, morpho_ids in zip(cells, comp_data, morpho_data): from_morpho_id = int(morpho_ids[0]) to_morpho_id = int(morpho_ids[1]) # Load morphologies from the map if they're not in the cache yet _cache_morpho(from_morpho_id) _cache_morpho(to_morpho_id) # Append the intersection with a new connection intersections.append( Connection( *cell_ids, # zipped dataset: from id & to id *comp_ids, # zipped morphologyset: from comp & to comp morphos[from_morpho_id], # cached: 'from' TrueMorphology morphos[to_morpho_id] # cached: 'to' TrueMorphology ) ) return intersections def get_divergence_list(self): presynaptic_type = self.get_presynaptic_types()[0] placement_set = self.scaffold.get_placement_set(presynaptic_type) unique_connections = np.unique(self.get_dataset(), axis=0) _, divergence_list = np.unique(unique_connections[:, 0], return_counts=True) return np.concatenate( (divergence_list, np.zeros(len(placement_set) - len(divergence_list))) ) @property def divergence(self): divergence_list = self.get_divergence_list() if len(divergence_list) == 0: return 0 return np.mean(divergence_list) def get_convergence_list(self): postsynaptic_type = self.get_postsynaptic_types()[0] placement_set = self.scaffold.get_placement_set(postsynaptic_type) unique_connections = np.unique(self.get_dataset(), axis=0) _, convergence_list = np.unique(unique_connections[:, 1], return_counts=True) return np.concatenate( (convergence_list, np.zeros(len(placement_set) - len(convergence_list))) ) @property def convergence(self): convergence_list = self.get_convergence_list() if len(convergence_list) == 0: return 0 return np.mean(convergence_list) def __iter__(self): if self.compartment_set.exists(): return self.intersections else: return self.connections def __len__(self): return self.shape[0] @property def meta(self): """ Retrieve the metadata associated with this connectivity set. Returns ``None`` if the connectivity set does not exist. :return: Metadata :rtype: dict """ return self.attributes @property def connection_types(self): """ Return all the ConnectionStrategies that contributed to the creation of this connectivity set. """ # Get list of contributing types type_list = self.attributes["connection_types"] # Map contributing type names to contributing types return list(map(lambda name: self.scaffold.get_connection_type(name), type_list)) def _get_cell_types(self, key="from"): meta = self.meta if key + "_cell_types" in meta: cell_types = set() for name in meta[key + "_cell_types"]: cell_types.add(self.scaffold.get_cell_type(name)) return list(cell_types) cell_types = set() for connection_type in self.connection_types: cell_types |= set(connection_type.__dict__[key + "_cell_types"]) return list(cell_types)
[docs] def get_presynaptic_types(self): """ Return a list of the presynaptic cell types found in this set. """ return self._get_cell_types(key="from")
[docs] def get_postsynaptic_types(self): """ Return a list of the postsynaptic cell types found in this set. """ return self._get_cell_types(key="to")
[docs]class PlacementSet(Resource): """ Fetches placement data from storage. You can either access the parallel-array datasets ``.identifiers``, ``.positions`` and ``.rotations`` individually or create a collection of :class:`Cells <.models.Cell>` that each contain their own identifier, position and rotation. .. note:: Use :func:`.core.get_placement_set` to correctly obtain a PlacementSet. """ def __init__(self, handler, cell_type): root = "/cells/placement/" tag = cell_type.name super().__init__(handler, root + tag) if not self.exists(): raise DatasetNotFoundError("PlacementSet '{}' does not exist".format(tag)) self.type = cell_type self.tag = tag self._identifiers = Resource(handler, root + tag + "/identifiers") self._filter = f = _Filter() def id_source(): return np.array( expand_continuity_list(self._identifiers.get_dataset()), dtype=int ) self._filter.filter_source = id_source self.identifier_set = _FilteredIds(handler, root + tag + "/identifiers", f) self.positions_set = _FilteredResource(handler, root + tag + "/positions", f) self.rotation_set = _FilteredResource(handler, root + tag + "/rotations", f) @property def identifiers(self): """ Return a list of cell identifiers. """ return self.identifier_set.get_dataset() @property def positions(self): """ Return a dataset of cell positions. """ try: return self.positions_set.get_dataset() except DatasetNotFoundError: raise DatasetNotFoundError( "No position information for the '{}' placement set.".format(self.tag) ) @property def rotations(self): """ Return a dataset of cell rotations. :raises: DatasetNotFoundError when there is no rotation information for this cell type. """ try: return self.rotation_set.get_dataset() except DatasetNotFoundError: raise DatasetNotFoundError( "No rotation information for the '{}' placement set.".format(self.tag) ) @property def cells(self): """ Reorganize the available datasets into a collection of :class:`Cells <.models.Cell>` """ return [ Cell(id, self.type, position, rotation) for id, position, rotation in self ] def __iter__(self): id_iter = iterate_continuity_list(self._identifiers.get_dataset()) iterators = [iter(id_iter), self._none(), self._none()] if self.positions_set.exists(): iterators[1] = iter(self.positions) if self.rotation_set.exists(): iterators[2] = iter(self.rotations) return zip(*iterators) def __len__(self): return count_continuity_list(self._identifiers) def _none(self): """ Generate ``len(self)`` times ``None`` """ for i in range(len(self)): yield None def set_filter(self, filter): self._filter.active_filter = filter
class _Filter: """ To use, set an `active_filter` and `filter_source` function that return numpy arrays. `filter_source` should return a dataset of the same shape as the `data` being filtered. `active_filter` will then be called to create a boolean mask from `filter_source`, applied as a filter on the data being filtered. (This means that `filter_source` and `data` should be parallel arrays) """ active_filter = None filter_source = None def filter(self, data): if self.active_filter is None: return data return data[np.isin(self.filter_source(), self.active_filter())] class _FilteredResource(Resource): def __init__(self, handler, path, filter): super().__init__(handler, path) self._filter = filter def get_dataset(self, *args, **kwargs): return self._filter.filter(super().get_dataset(*args, **kwargs)) class _FilteredIds(_FilteredResource): def get_dataset(self, *args, **kwargs): data = np.array( expand_continuity_list(Resource.get_dataset(self, *args, **kwargs)), dtype=int ) return self._filter.filter(data) class Cell: def __init__(self, id, cell_type, position, rotation=None): self.id = int(id) self.type = cell_type self.position = position self.rotation = rotation @classmethod def from_repo_data(cls, cell_type, data): return cls(data[0], cell_type, data[2:5]) class MorphologySet: def __init__(self, scaffold, cell_type, placement_set, compartment_types=None, N=50): self.scaffold = scaffold self.cell_type = cell_type self._construct_map(cell_type, placement_set, compartment_types, N) self._placement_set = placement_set self._cells = placement_set.cells def __len__(self): return len(self._cells) def __iter__(self): return zip(self._cells, self._unmap_morphologies()) def __getitem__(self, id): return self._cells[id], self._unmap_morphology(id) def _unmap_morphology(self, id): return self._morphologies[self._morphology_index[id]] def _unmap_morphologies(self): return [self._morphologies[i] for i in self._morphology_index] def _construct_map(self, cell_type, placement_set, compartment_types=None, N=50): """ Associate to the placement_set an index map to only the morphologies in the MorphologyRepository needed for that placement set """ # Fetch a list of all available morphology names, whose index in the # list will be used as an identifier for the morphology in the cache # list of all morphologies morphology_names = self.scaffold.morphology_repository.list_morphologies( cell_type=cell_type ) if len(morphology_names) == 0: raise MorphologyRepositoryError("No morphologies found for " + cell_type.name) # Select a random morphology for each cell and store its index in a list random_morphologies = [ random.choice(range(len(morphology_names))) for _ in range(len(placement_set)) ] self._morphology_index = [] self._morphology_map = [] if placement_set.rotation_set.exists() or ( self.scaffold and hasattr(self.scaffold.rotations, cell_type.name) ): # Rotations? Get the rotated version of the randomly selected morphology and # check in `self._morphology_map` if it has been used before. rotations = ( placement_set.rotation_set.get_dataset() if placement_set.rotation_set.exists() else self.scaffold.rotations[cell_type.name] ) # Get the names of the rotated morphologies that need to be loaded for i in range(len(rotations)): rot_str = ( "__" + str(int(rotations[i][0])) + "_" + str(int(rotations[i][1])) ) mname = morphology_names[random_morphologies[i]] + rot_str if mname in self._morphology_map: self._morphology_index.append(self._morphology_map.index(mname)) else: self._morphology_index.append(len(self._morphology_map)) self._morphology_map.append(mname) else: # No rotations? Just use the randomly selected morphologies self._morphology_index = random_morphologies self._morphology_map = morphology_names # Function to load and voxelize a morphology def load_morpho(scaffold, morpho_ind, compartment_types=None): m = scaffold.morphology_repository.get_morphology( self._morphology_map[morpho_ind] ) m._set_index = morpho_ind m.voxelize(N, compartments=m.get_compartments(compartment_types)) return m # Load and voxelize only the unique morphologies present in the morphology map. self._morphologies = [ load_morpho(self.scaffold, i, compartment_types) for i in range(len(self._morphology_map)) ]