import abc as _abc
import contextlib as _cl
import datetime as _dt
import email.utils as _eml
import functools as _ft
import hashlib as _hl
import os as _os
import pathlib as _pl
import tempfile as _tf
import time as _t
import typing
import typing as _tp
import urllib.parse as _up
import urllib.request as _ur
import nrrd as _nrrd
import requests as _rq
from .. import config
from .._util import obj_str_insert
from ..config import types
from ..config._attrs import cfglist
from ..morphologies.parsers import MorphologyParser
from ..reporting import warn
if _tp.TYPE_CHECKING:
from ..core import Scaffold
from ..morphologies import Morphology
from ..storage.interfaces import FileStore
def _is_uri(url):
return _up.urlparse(url).scheme != ""
def _uri_to_path(uri):
parsed = _up.urlparse(uri)
host = "{0}{0}{mnt}{0}".format(_os.path.sep, mnt=parsed.netloc)
return _os.path.relpath(
_os.path.normpath(_os.path.join(host, _ur.url2pathname(_up.unquote(parsed.path))))
)
[docs]
class FileDependency:
def __init__(
self,
source: typing.Union[str, _os.PathLike],
file_store: "FileStore" = None,
ext: str = None,
cache=True,
):
self._given_source: str = str(source)
if _is_uri(self._given_source):
self._uri = self._given_source
else:
path = _pl.Path(source).absolute()
self._uri: str = path.as_uri()
ext = ext or path.suffix[1:] or None
self._scheme: "FileScheme" = _get_scheme(_up.urlparse(self._uri).scheme)
self.file_store = file_store
self.extension = ext
self.cache = cache
@property
def uri(self):
return self._uri
def __hash__(self):
return hash(self._uri)
def __inv__(self):
return self._given_source
@obj_str_insert
def __str__(self):
return f"'{self._uri}'"
[docs]
def get_content(self, check_store=True):
if check_store:
stored = self.get_stored_file()
if stored is None or self._scheme.should_update(self, stored):
content = self._get_content()
if self.cache:
self.store_content(content, meta=self._scheme.get_meta(self))
return content
else:
return stored.load()
else:
return self._get_content()
[docs]
@_cl.contextmanager
def provide_locally(self):
try:
path = self._scheme.get_local_path(self)
if _os.path.exists(path):
yield (path, None)
else:
raise FileNotFoundError()
except (TypeError, FileNotFoundError):
if self.file_store is None:
raise FileNotFoundError(f"Can't find {self}")
content = self.get_content()
with _tf.TemporaryDirectory() as dirpath:
name = "file"
if self.extension:
name = f"{name}.{self.extension}"
filepath = _os.path.join(dirpath, name)
with open(filepath, "wb") as f:
f.write(content[0])
yield (filepath, content[1])
[docs]
def provide_stream(self):
return self._scheme.provide_stream(self)
[docs]
def get_stored_file(self):
if not self.file_store:
raise ValueError(
"Can't check for file dependency in store before scaffold is ready."
)
return self.file_store.find_meta("source", self._given_source)
[docs]
def store_content(self, content, encoding=None, meta=None):
if not self.file_store:
raise ValueError(
"Can't store file dependency in store before scaffold is ready."
)
if isinstance(content, tuple):
content, encoding = content
# Save the file under the same id if it already exists
id_ = getattr(self.get_stored_file(), "id", None)
if meta is None:
meta = {}
meta.update(self._scheme.get_meta(self))
meta["source"] = self._given_source
return self.file_store.store(
content, meta=meta, encoding=encoding, id=id_, overwrite=True
)
[docs]
def should_update(self):
if not self.file_store:
raise ValueError(
"Can't update file dependency in store before scaffold is ready."
)
stored = self.get_stored_file()
return stored is None or self._scheme.should_update(self, stored)
def _get_content(self):
if not self._scheme.find(self):
raise FileNotFoundError(f"Couldn't find {self._uri}")
return self._scheme.get_content(self)
[docs]
def update(self, force=False):
if force or self.should_update():
self.get_content()
[docs]
class UriScheme(_abc.ABC):
[docs]
@_abc.abstractmethod
def find(self, file: FileDependency):
path = _uri_to_path(file.uri)
return _os.path.exists(path)
[docs]
@_abc.abstractmethod
def should_update(self, file: FileDependency, stored_file):
path = _uri_to_path(file.uri)
try:
file_mtime = _os.path.getmtime(path)
except FileNotFoundError:
return False
try:
stored_mtime = stored_file.mtime
except Exception:
return True
return file_mtime > stored_mtime
[docs]
@_abc.abstractmethod
def get_content(self, file: FileDependency):
with self.provide_stream(file) as (fp, encoding):
fp: _tp.BinaryIO
return (fp.read(), encoding)
[docs]
@_cl.contextmanager
@_abc.abstractmethod
def provide_stream(self, file):
path = _uri_to_path(file.uri)
with open(path, "rb") as fp:
yield (fp, None)
[docs]
@_abc.abstractmethod
def get_local_path(self, file: FileDependency):
raise RuntimeError(f"{type(self)} has no local path representation")
[docs]
class FileScheme(UriScheme):
[docs]
def find(self, file: FileDependency):
return super().find(file)
[docs]
def should_update(self, file: FileDependency, stored_file):
return super().should_update(file, stored_file)
[docs]
def get_content(self, file: FileDependency):
return super().get_content(file)
[docs]
def provide_stream(self, file: FileDependency):
return super().provide_stream(file)
[docs]
def get_local_path(self, file: FileDependency):
return _uri_to_path(file.uri)
[docs]
class UrlScheme(UriScheme):
[docs]
def resolve_uri(self, file: FileDependency):
return file.uri
[docs]
def find(self, file: FileDependency):
with self.create_session() as session:
response = session.head(self.resolve_uri(file))
return response.status_code == 200
[docs]
def should_update(self, file: FileDependency, stored_file):
mtime = stored_file.mtime
headers = self.get_meta(file).get("headers", {})
stored_headers = stored_file.meta.get("headers", {})
if "ETag" in headers:
new_etag = headers["ETag"]
old_etag = stored_headers.get("ETag")
# Check if we have the latest ETag
return old_etag != new_etag
elif "Last-Modified" in headers:
their_mtime = _dt.datetime(
*_eml.parsedate(headers["Last-Modified"])[:6]
).timestamp()
return their_mtime > mtime
# 100h default expiration
return _t.time() > mtime + 360000
[docs]
def get_content(self, file: FileDependency):
with self.create_session() as session:
response = session.get(self.resolve_uri(file))
return (response.content, response.encoding)
[docs]
def get_local_path(self, file: FileDependency):
raise TypeError("URL schemes don't have a local path")
[docs]
@_cl.contextmanager
def provide_stream(self, file):
with self.create_session() as session:
response = session.get(self.resolve_uri(file), stream=True)
response.raw.decode_content = True
response.raw.auto_close = False
yield (response.raw, response.encoding)
[docs]
def create_session(self):
return _rq.Session()
[docs]
def get_base_url(self):
raise NotImplementedError("Base UrlScheme has no fixed base URL.")
[docs]
class NeuroMorphoScheme(UrlScheme):
_nm_url = "https://neuromorpho.org/"
_meta = "api/neuron/name/"
_files = "dableFiles/"
[docs]
def resolve_uri(self, file: FileDependency):
meta = self.get_nm_meta(file)
return self._swc_url(meta["archive"], meta["neuron_name"])
[docs]
def get_base_url(self):
return self._nm_url
@classmethod
def _swc_url(cls, archive, name):
base_url = f"{cls._nm_url}{cls._files}{_up.quote(archive.lower())}"
return f"{base_url}/CNG%20version/{name}.CNG.swc"
[docs]
def create_session(self):
# Weak DH key on neuromorpho.org
# https://stackoverflow.com/a/76217135/1016004
from requests.adapters import HTTPAdapter
from urllib3 import PoolManager
from urllib3.util import create_urllib3_context
class DHAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False, **kwargs):
ctx = create_urllib3_context(ciphers=":HIGH:!DH:!aNULL")
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
ssl_context=ctx,
**kwargs,
)
session = _rq.Session()
session.mount(self._nm_url, DHAdapter())
return session
@_ft.cache
def _get_schemes() -> _tp.Mapping[str, typing.Type[FileScheme]]:
from ..plugins import discover
schemes = discover("storage.schemes")
schemes["file"] = FileScheme
schemes["http"] = schemes["https"] = UrlScheme
schemes["nm"] = NeuroMorphoScheme
return schemes
def _get_scheme(scheme: str) -> FileScheme:
schemes = _get_schemes()
try:
return schemes[scheme]()
except KeyError:
raise KeyError(f"{scheme} is not a known file scheme.")
[docs]
@config.node
class FileDependencyNode:
scaffold: "Scaffold"
file: "FileDependency" = config.attr(type=FileDependency)
def __init__(self, value=None, **kwargs):
if value is not None:
self.file = value
def __boot__(self):
self.file.file_store = self.scaffold.files
if self.scaffold.is_main_process():
self.file.update()
def __inv__(self):
if not isinstance(self, FileDependencyNode):
return self
if self._config_pos_init:
return self.file._given_source
else:
tree = self.__tree__()
return tree
[docs]
def load_object(self):
return self.file.get_content()
[docs]
def provide_locally(self):
return self.file.provide_locally()
[docs]
def provide_stream(self):
return self.file.provide_stream()
[docs]
def get_stored_file(self):
return self.file.get_stored_file()
[docs]
@config.node
class CodeDependencyNode(FileDependencyNode):
module: str = config.attr(type=str, required=types.shortform())
attr: str = config.attr(type=str)
@config.property
def file(self):
if getattr(self, "scaffold", None) is not None:
file_store = self.scaffold.files
else:
file_store = None
return FileDependency(
self.module.replace(".", _os.sep) + ".py", file_store=file_store
)
def __init__(self, module=None, **kwargs):
super().__init__(**kwargs)
if module is not None:
self.module = module
[docs]
def load_object(self):
import importlib.util
import sys
sys.path.append(_os.getcwd())
try:
with self.file.provide_locally() as (path, encoding):
spec = importlib.util.spec_from_file_location(self.module, path)
module = importlib.util.module_from_spec(spec)
sys.modules[self.module] = module
spec.loader.exec_module(module)
return module if self.attr is None else module[self.attr]
finally:
tmp = list(reversed(sys.path))
tmp.remove(_os.getcwd())
sys.path = list(reversed(tmp))
[docs]
class OperationCallable(typing.Protocol):
def __call__(self, obj: object, **kwargs: typing.Any) -> object:
pass
[docs]
@config.node
class Operation:
func: OperationCallable = config.attr(type=types.function_())
parameters: dict[typing.Any] = config.catch_all(type=types.any_())
def __init__(self, value=None, /, **kwargs):
if value is not None:
self.func = value
def __call__(self, obj):
return self.func(obj, **self.parameters)
class FilePipelineMixin:
pipeline: cfglist[Operation] = config.list(type=Operation)
def pipe(self, input):
return _ft.reduce(lambda state, func: func(state), self.pipeline, input)
[docs]
@config.node
class NrrdDependencyNode(FilePipelineMixin, FileDependencyNode):
"""
Configuration dependency node to load NRRD files.
"""
[docs]
def get_data(self):
with self.file.provide_locally() as (path, encoding):
return _nrrd.read(path)[0]
[docs]
def load_object(self):
return self.pipe(self.get_data())
[docs]
class MorphologyOperationCallable(OperationCallable):
"""
Hello
"""
def __call__(self, obj: "Morphology", **kwargs: typing.Any) -> "Morphology":
pass
[docs]
@config.node
class MorphologyOperation(Operation):
func: MorphologyOperationCallable = config.attr(
type=types.method_shortcut("bsb.morphologies.Morphology")
)
[docs]
@config.node
class MorphologyDependencyNode(FilePipelineMixin, FileDependencyNode):
"""
Configuration dependency node to load morphology files.
The content of these files will be stored in bsb.morphologies.Morphology instances.
"""
pipeline: cfglist[MorphologyOperation] = config.list(type=MorphologyOperation)
name: str = config.attr(type=str, default=None, required=False)
parser: MorphologyParser = config.attr(type=MorphologyParser, default={})
"""
Name associated to the morphology. If not provided, the program will use the name of the file
in which the morphology is stored.
"""
[docs]
def store_content(self, content, *args, encoding=None, meta=None):
if meta is None:
meta = {}
meta["_hash"] = self._hash(content)
meta["_stale"] = True
stored = super().store_content(content, *args, encoding=encoding, meta=meta)
return stored
[docs]
def load_object(self, parser=None, save=True) -> "Morphology":
if parser is None or self.__class__.parser.is_dirty(self):
parser = self.parser
self.file.update()
stored = self.get_stored_file()
meta = stored.meta
if meta.get("_stale", True):
content, meta = stored.load()
if hasattr(parser, "parse_content"):
morpho = parser.parse_content(
content.decode(meta.get("encoding", "utf8"))
)
else:
morpho = parser.parse(self.file)
morpho.meta = meta
morpho = self.pipe(morpho)
meta["hash"] = self._hash(content)
meta["_stale"] = False
morpho.meta = meta
if save:
self.scaffold.morphologies.save(
self.get_morphology_name(), morpho, overwrite=True
)
return morpho
else:
return self.scaffold.morphologies.load(self.get_morphology_name())
[docs]
def get_morphology_name(self):
"""
Returns morphology name provided by the user or extract it from its file name.
:returns: Morphology name
:rtype: str
"""
return self.name if self.name is not None else _pl.Path(self.file.uri).stem
[docs]
def store_object(self, morpho, hash_):
"""
Save a morphology into the circuit file under the name of this instance morphology.
:param hash_: Hash key to store as metadata with the morphology
:type hash_: str
:param morpho: Morphology to store
:type morpho: bsb.morphologies.Morphology
"""
self.scaffold.morphologies.save(
self.get_morphology_name(), morpho, meta={"hash": hash_}
)
def _hash(self, content):
md5 = _hl.new("md5", usedforsecurity=False)
if isinstance(content, str):
md5.update(content.encode("utf-8"))
else:
md5.update(content)
return md5.hexdigest()
[docs]
def queue(self, pool):
"""
Add the loading of the current morphology to a job queue.
:param pool: Queue of jobs.
:type pool: bsb.services.pool.JobPool
"""
def create_morphology(scaffold, i):
scaffold.configuration.morphologies[i].load_object()
pool.queue(
create_morphology, (self._config_index,), submitter=self, uri=self.file.uri
)
@config.node
class MorphologyPipelineNode(FilePipelineMixin):
files: list[MorphologyDependencyNode] = config.list(
type=MorphologyDependencyNode, required=True
)
pipeline: cfglist[MorphologyOperation] = config.list(type=MorphologyOperation)
parser: MorphologyParser = config.attr(type=MorphologyParser, required=False)
def queue(self, pool):
"""
Add the loading of the current morphology to a job queue.
:param pool: Queue of jobs.
:type pool:bsb.services.pool.JobPool
"""
def job(scaffold, i, j):
me = scaffold.configuration.morphologies[i]
fnode = me.files[j]
morpho = fnode.load_object(parser=me.parser, save=False)
morpho_out = me.pipe(morpho)
scaffold.morphologies.save(
fnode.get_morphology_name(), morpho_out, overwrite=True
)
for k in range(len(self.files)):
pool.queue(
job,
(self._config_index, k),
submitter=self,
node=k,
uri=self.files[k].file.uri,
)
__all__ = [
"CodeDependencyNode",
"FileDependency",
"FileDependencyNode",
"FileScheme",
"MorphologyDependencyNode",
"MorphologyOperation",
"NeuroMorphoScheme",
"NrrdDependencyNode",
"Operation",
"UriScheme",
"UrlScheme",
]