Source code for bsb.config._attrs

"""
    An attrs-inspired class annotation system, but my A stands for amateuristic.
"""

import builtins

import errr

from ..exceptions import (
    BootError,
    CastError,
    CfgReferenceError,
    NoReferenceAttributeSignal,
    RequirementError,
)
from ..services import MPI
from ._compile import _wrap_reserved
from ._hooks import run_hook
from ._make import (
    MISSING,
    _resolve_references,
    compile_class,
    compile_isc,
    compile_new,
    compile_postnew,
    make_copyable,
    make_dictable,
    make_get_node_name,
    make_tree,
    walk_nodes,
    wrap_root_postnew,
)


[docs] def root(root_cls): """ Decorate a class as a configuration root node. """ root_cls.attr_name = root_cls.node_name = r"{root}" root_cls._config_isroot = True return node(root_cls, root=True)
[docs] def node(node_cls, root=False, dynamic=False, pluggable=False): """ Decorate a class as a configuration node. """ # Recreate the class to set its metaclass a posteriori node_cls = compile_class(node_cls) node_cls._config_unset = [] # Inherit the parent's attributes, if any exist on the class already attrs = getattr(node_cls, "_config_attrs", {}).copy() for k, v in builtins.dict(node_cls.__dict__).items(): # Add our attributes if isinstance(v, ConfigurationAttribute): if v.unset: attrs.pop(k, None) delattr(node_cls, k) # Keep track of what this class wants to unset, in case of MRO traversal. node_cls._config_unset.append(k) else: attrs[k] = v node_cls._config_attrs = attrs node_cls.__post_new__ = compile_postnew(node_cls) node_cls._config_isroot = root if root: node_cls.__post_new__ = wrap_root_postnew(node_cls.__post_new__) node_cls._config_isbooted = False node_cls.__new__ = compile_new( node_cls, dynamic=dynamic, pluggable=pluggable, root=root ) if dynamic: node_cls.__init_subclass__ = compile_isc(node_cls, dynamic) make_get_node_name(node_cls, root=root) make_tree(node_cls) make_dictable(node_cls) make_copyable(node_cls) return node_cls
[docs] def dynamic( node_cls=None, attr_name="cls", classmap=None, auto_classmap=False, classmap_entry=None, **kwargs, ): """ Decorate a class to be castable to a dynamically configurable class using a class configuration attribute. *Example*: Register a required string attribute ``class`` (this is the default): .. code-block:: python @dynamic class Example: pass *Example*: Register a string attribute ``type`` with a default value 'pkg.DefaultClass' as dynamic attribute: .. code-block:: python @dynamic(attr_name='type', required=False, default='pkg.DefaultClass') class Example: pass :param attr_name: Name under which to register the class attribute in the node. :type attr_name: str :param kwargs: All keyword arguments are passed to the constructor of the :func:`attribute <.config.attr>`. """ if "required" not in kwargs and "default" not in kwargs: kwargs["required"] = True if "type" not in kwargs: kwargs["type"] = str class_attr = ConfigurationAttribute(**kwargs) dynamic_config = DynamicNodeConfiguration(classmap, auto_classmap, classmap_entry) if node_cls is None: # If node_cls is None, it means that no positional argument was given, which most # likely means that the @dynamic(...) syntax was used instead of the @dynamic. # This means we have to return an inner decorator instead of the decorated class def decorator(node_cls): return _dynamic(node_cls, class_attr, attr_name, dynamic_config) return decorator # Regular @dynamic syntax used, return decorated class return _dynamic(node_cls, class_attr, attr_name, dynamic_config)
class DynamicNodeConfiguration: def __init__(self, classmap=None, auto_classmap=False, entry=None): self.classmap = classmap self.auto_classmap = auto_classmap self.entry = entry def _dynamic(node_cls, class_attr, attr_name, config): # Set the dynamic attribute setattr(node_cls, attr_name, class_attr) node_cls._config_dynamic_attr = attr_name # Other than that compile the dynamic class like a regular node class node_cls = node(node_cls, dynamic=config) if config.auto_classmap or config.classmap: node_cls._config_dynamic_classmap = config.classmap or {} # This adds the parent class to its own classmap, which for subclasses happens in init # subclass if config.entry is not None: if not hasattr(node_cls, "_config_dynamic_classmap"): raise ValueError( f"Calling `@config.dynamic` with `entry='{config.entry}'`" + " requires `classmap` or `auto_classmap` to be set as well" + f" on '{node_cls.__name__}'." ) node_cls._config_dynamic_classmap[config.entry] = node_cls # Mark the class as its own dynamic root, (grand)child classes will all need to # inherit from this as an interface contract. node_cls._config_dynamic_root = node_cls return node_cls
[docs] def pluggable(key, plugin_name=None): """ Create a node whose configuration is defined by a plugin. *Example*: If you want to use the :guilabel:`attr` to chose from all the installed `dbbs_scaffold.my_plugin` plugins: .. code-block:: python @pluggable('attr', 'my_plugin') class PluginNode: pass This will then read :guilabel:`attr`, load the plugin and configure the node from the node class specified by the plugin. :param plugin_name: The name of the category of the plugin endpoint :type plugin_name: str """ def inner_decorator(node_cls): node_cls._config_plugin_name = plugin_name node_cls._config_plugin_key = key class_attr = ConfigurationAttribute(type=str, required=True) setattr(node_cls, key, class_attr) return node(node_cls, pluggable=True) return inner_decorator
[docs] def attr(**kwargs): """ Create a configuration attribute. Only works when used inside a class decorated with the :func:`node <.config.node>`, :func:`dynamic <.config.dynamic>`, :func:`root <.config.root>` or :func:`pluggable <.config.pluggable>` decorators. :param type: Type of the attribute's value. :type type: Callable :param required: Should an error be thrown if the attribute is not present? :type required: bool :param default: Default value. :type default: Any :param call_default: Should the default value be used (False) or called (True). Use this to prevent mutable default values. :type call_default: bool :param key: If set, the key of the parent is stored on this attribute. """ return ConfigurationAttribute(**kwargs)
[docs] def ref(reference, **kwargs): """ Create a configuration reference. Configuration references are attributes that transform their value into the value of another node or value in the document:: { "keys": { "a": 3, "b": 5 }, "simple_ref": "a" } With ``simple_ref = config.ref(lambda root, here: here["keys"])`` the value ``a`` will be looked up in the configuration object (after all values have been cast) at the location specified by the callable first argument. """ return ConfigurationReferenceAttribute(reference, **kwargs)
[docs] def reflist(reference, **kwargs): """ Create a configuration reference list. """ if "default" not in kwargs: kwargs["default"] = builtins.list kwargs["call_default"] = True return ConfigurationReferenceListAttribute(reference, **kwargs)
[docs] def slot(**kwargs): """ Create an attribute slot that is required to be overriden by child or plugin classes. """ return ConfigurationAttributeSlot(**kwargs)
[docs] def property(val=None, /, type=None, **kwargs): """ Create a configuration property attribute. You may provide a value or a callable. Call `setter` on the return value as you would with a regular property. """ if type is None: type = lambda v: v def decorator(val): prop = val if callable(val) else lambda s: val return ConfigurationProperty(prop, type=type, **kwargs) if val is None: return decorator else: return decorator(val)
[docs] def provide(value): """ Provide a value for a parent class' attribute. Can be a value or a callable, a readonly configuration property will be created from it either way. """ prop = property(value) def provided(self, instance, value): raise AttributeError(f"Can't set attribute, class provides the value '{value}'.") # Create a callable object that invokes `provided` when called, and whose `bool()` # returns `False`. Later in `_is_settable_attr`, we use this to trick the short # circuiting logic, so that this setter doesn't make the internal logic set and error # out on this attr. prop.setter( type("provision", (), {"__call__": provided, "__bool__": lambda s: False})() ) return prop
[docs] def list(**kwargs): """ Create a configuration attribute that holds a list of configuration values. Best used only for configuration nodes. Use an :func:`attr` in combination with a :func:`types.list <.config.types.list>` type for simple values. """ return ConfigurationListAttribute(**kwargs)
[docs] def dict(**kwargs): """ Create a configuration attribute that holds a key value pairs of configuration values. Best used only for configuration nodes. Use an :func:`attr` in combination with a :func:`types.dict <.config.types.dict>` type for simple values. """ return ConfigurationDictAttribute(**kwargs)
[docs] def catch_all(**kwargs): """ Catches any unknown key with a value that can be cast to the given type and collects them under the attribute name. """ return ConfigurationAttributeCatcher(**kwargs)
[docs] def unset(): """ Override and unset an inherited configuration attribute. """ return ConfigurationAttribute(unset=True)
[docs] def file(**kwargs): """ Create a file dependency attribute. """ from ..storage._files import FileDependencyNode kwargs.setdefault("type", FileDependencyNode) return attr(**kwargs)
def _setattr(instance, name, value): instance.__dict__["_" + name] = value def _getattr(instance, name): try: return instance.__dict__["_" + name] except KeyError as e: instance.__getattribute__(e.args[0]) def _hasattr(instance, name): return "_" + name in instance.__dict__ def _get_root(obj): parent = obj while hasattr(parent, "_config_parent") and parent._config_parent is not None: parent = parent._config_parent return parent def _strict_root(obj): root = _get_root(obj) return root if getattr(root, "_config_isroot", False) else None def _booted_root(obj): root = _strict_root(obj) return root if root and root._config_isfinished and root._config_isbooted else None def _is_booted(obj): return obj and obj._config_isbooted def _root_is_booted(obj): root = _strict_root(obj) return _is_booted(root) def _boot_nodes(top_node, scaffold): for node in walk_nodes(top_node): node.scaffold = scaffold # Boot attributes for attr in getattr(node, "_config_attrs", {}).values(): booted = {None} for cls in type(node).__mro__: cls_attr = getattr(cls, attr.attr_name, None) if (boot := getattr(cls_attr, "__boot__", None)) and boot not in booted: boot(node, scaffold) booted.add(boot) # Boot node hook try: run_hook(node, "boot") except Exception as e: errr.wrap(BootError, e, prepend=f"Failed to boot {node}:") # fixme: why is this here? Will deadlock in case of BootError on specific node only. MPI.barrier() def _unset_nodes(top_node): for node in walk_nodes(top_node): try: del node.scaffold except Exception: pass node._config_parent = None node._config_key = None if hasattr(node, "_config_index"): node._config_index = None run_hook(node, "unboot")
[docs] class ConfigurationAttribute: """ Base implementation of all the different configuration attributes. Call the factory function :func:`.attr` instead. """ def __init__( self, type=None, default=None, call_default=None, required=False, key=False, unset=False, hint=MISSING, ): if not callable(required): self.required = lambda s: required else: self.required = required self.key = key self.default = default self.call_default = call_default self.type = self._set_type(type, key) self.unset = unset self.hint = hint def __set_name__(self, owner, name): self.attr_name = name def __get__(self, instance, owner): if instance is None: return self return _getattr(instance, self.attr_name)
[docs] def fset(self, instance, value): return _setattr(instance, self.attr_name, value)
def __set__(self, instance, value): if _hasattr(instance, self.attr_name): ex_value = _getattr(instance, self.attr_name) _unset_nodes(ex_value) if value is None: # Don't try to cast None to a value of the attribute type. return self.fset(instance, None) try: value = self.type(value, _parent=instance, _key=self.attr_name) except ValueError: # This value error should only arise when users are manually setting # attributes in an already bootstrapped config tree. raise CastError( f"'{value}' is not convertible to {self.type.__name__}," f" for attribute '{self.attr_name}' of {instance}." ) from None except (RequirementError, CastError) as e: if not hasattr(e, "node") or not e.node: e.node, e.attr = instance, self.attr_name raise except Exception as e: raise CastError( f"Couldn't cast '{value}' into {self.type.__name__}: {e}", instance, self.attr_name, ) from e self.flag_dirty(instance) # The value was cast to its intented type and the new value can be set. self.fset(instance, value) root = _strict_root(instance) if _is_booted(root): _boot_nodes(value, root.scaffold) def _set_type(self, type, key): self._config_type = type # Determine type of the attribute if not type and self.default is not None: if self.should_call_default(): t = builtins.type(self.default()) else: t = builtins.type(self.default) else: from . import types t = type or (key and types.key()) or types.str() # This call wraps the type handler so that it accepts all reserved keyword args # like `_parent` and `_key` t = _wrap_reserved(t) return t
[docs] def get_type(self): return self._config_type
[docs] def get_hint(self): if self.hint is not MISSING: return self.hint if hasattr(self.type, "__hint__"): return self.type.__hint__() return MISSING
[docs] def get_node_name(self, instance): return instance.get_node_name() + "." + self.attr_name
[docs] def is_node_type(self): return hasattr(self._config_type, "_config_attrs")
[docs] def tree(self, instance): val = _getattr(instance, self.attr_name) return self.tree_of(val)
[docs] def tree_of(self, value): # Allow subnodes and other class values to convert themselves to their tree # representation if hasattr(value, "__tree__"): value = value.__tree__() # Check if the type handler specifies any inversion function to convert tree # values back to how they were found in the document. if hasattr(self.type, "__inv__") and value is not None: value = self.type.__inv__(value) return value
[docs] def flag_dirty(self, instance): instance._config_state[self.attr_name] = False if self.attr_name not in instance._config_attr_order: instance._config_attr_order.append(self.attr_name)
[docs] def is_dirty(self, instance): return not instance._config_state.get(self.attr_name, True)
[docs] def flag_pristine(self, instance): instance._config_state[self.attr_name] = True
[docs] def get_default(self): return self.default() if self.should_call_default() else self.default
[docs] def should_call_default(self): cdf = self.call_default return cdf or (cdf is None and callable(self.default))
[docs] class cfglist(builtins.list): """ Extension of the builtin list to manipulate lists of configuration nodes. """ def get_node_name(self): return self._config_parent.get_node_name() + "." + self._config_attr_name def append(self, item): item = self._preset(len(self), item) super().append(item) self._postset((item,)) return self[-1] def insert(self, index, item): item = self._preset(index, item) super().insert(index, item) self._postset((item,)) self._reindex(index) def pop(self, index=-1): ex_item = super().pop(index) _unset_nodes(ex_item) self._reindex(index) return ex_item def clear(self): for node in self: _unset_nodes(node) super().clear() def sort(self, **kwargs): super().sort(**kwargs) self._reindex(0) def reverse(self): super().reverse() self._reindex(0) def extend(self, items): items = self._fromiter(len(self), items) super().extend(items) self._postset(items) def __setitem__(self, index, item): if isinstance(index, int): ex_items = [self[index]] item = self._preset(index, item) items = [item] reindex_from = None else: ex_items = self[index] reindex_from = index.indices(len(self))[0] items = item = self._fromiter(reindex_from, item) for ex_item in ex_items: _unset_nodes(ex_item) # Don't be fooled, item can be a single value or a list, depending on the index. super().__setitem__(index, item) if reindex_from is not None: self._reindex(reindex_from) self._postset(items) def _reindex(self, start): for i in range(start, len(self)): self[i]._config_key = i self[i]._config_index = i def _fromiter(self, start, items): return tuple(self._preset(start + i, item) for i, item in enumerate(items)) def _preset(self, index, item): try: item = self._elem_type(item, _parent=self, _key=index) try: item._config_index = index except Exception as e: pass return item except (RequirementError, CastError) as e: e.args = ( f"Couldn't cast element {index} from '{item}'" + f" into a {self._elem_type.__name__}. " + e.msg, *e.args, ) if not e.node: e.node, e.attr = self, index raise except Exception as e: raise CastError( f"Couldn't cast element {index} from '{item}'" + f" into a {self._elem_type.__name__}: {e}" ) def _postset(self, items): root = _strict_root(self) if root is not None: for item in items: _resolve_references(root, item) if _is_booted(root): for item in items: _boot_nodes(item, root.scaffold) @builtins.property def _config_attr_name(self): return self._config_attr.attr_name
class ConfigurationListAttribute(ConfigurationAttribute): def __init__(self, *args, size=None, **kwargs): super().__init__(*args, **kwargs) self.size = size def __set__(self, instance, value, _key=None): _setattr(instance, self.attr_name, self.fill(value, _parent=instance)) def __populate__(self, instance, value, unique_list=False): cfglist = _getattr(instance, self.attr_name) if not unique_list or value not in cfglist: builtins.list.append(cfglist, value) def fill(self, value, _parent, _key=None): _cfglist = cfglist() _cfglist._config_parent = _parent _cfglist._config_attr = self _cfglist._elem_type = self.child_type if isinstance(value, builtins.dict): raise CastError(f"Dictionary `{value}` given where list is expected.") _cfglist.extend(value or builtins.list()) if self.size is not None and len(_cfglist) != self.size: raise CastError( f"Couldn't cast {value} into a {self.size}-element list," + f" obtained {len(_cfglist)} elements" ) return _cfglist def _set_type(self, type, key=None): self.child_type = super()._set_type(type, key=False) return self.fill def tree(self, instance): val = _getattr(instance, self.attr_name) return [self.tree_of(e) for e in val] def get_hint(self): if self.hint is not MISSING: return self.hint if hasattr(self.child_type, "__hint__"): return [self.child_type.__hint__(), self.child_type.__hint__()] return MISSING
[docs] class cfgdict(builtins.dict): """ Extension of the builtin dictionary to manipulate dicts of configuration nodes. """ def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError( self.get_node_name() + " object has no attribute '{}'".format(name) ) def __setitem__(self, key, value): if key in self: _unset_nodes(self[key]) try: value = self._elem_type(value, _parent=self, _key=key) except (RequirementError, CastError) as e: if not (hasattr(e, "node") and e.node): e.node, e.attr = self, key raise except Exception: import traceback raise CastError( "Couldn't cast {}.{} from '{}' into a {}".format( self.get_node_name(), key, value, self._elem_type.__name__ ) + "\n" + traceback.format_exc() ) else: super().__setitem__(key, value) root = _strict_root(value) if root is not None: _resolve_references(root, value) if _is_booted(root): _boot_nodes(value, root.scaffold) def add(self, key, *args, **kwargs): if key in self: raise KeyError( f"{self.get_node_name()} already contains '{key}'." + " Use `node[key] = value` if you want to overwrite it." ) self[key] = value = self._elem_type(*args, _parent=self, _key=key, **kwargs) return value def clear(self): for node in self.values(): _unset_nodes(node) super().clear() def pop(self, key): item = super().pop(key) _unset_nodes(item) return item def popitem(self): key, value = super().popitem() _unset_nodes(value) return key, value def setdefault(self, key, value): if key in self: return self[key] else: self[key] = value return self[key] def update(self, other): for ckey, value in other.items(): self[ckey] = value def __ior__(self, other): ex_values = tuple(self.values()) try: merge_f = super().__ior__ except AttributeError: # Patch for 3.8 merge_f = super().update merge_f(other) new_values = tuple(self.values()) for removed_node in (e for e in ex_values if e not in new_values): _unset_nodes(removed_node) for added_node in (a for a in new_values if a not in ex_values): _boot_nodes(added_node, self.scaffold) return self def copy(self): return cfgdictcopy(self) @builtins.property def _config_attr_name(self): return self._config_attr.attr_name def get_node_name(self): return self._config_parent.get_node_name() + "." + self._config_attr_name
class cfgdictcopy(builtins.dict): def __init__(self, other): super().__init__(other) self._elem_type = other._elem_type self._copied_from = other @builtins.property def _config_attr_name(self): return self._copied_from._config_attr_name def get_node_name(self): return self._copied_from.get_node_name() class ConfigurationDictAttribute(ConfigurationAttribute): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __set__(self, instance, value, _key=None): _setattr( instance, self.attr_name, self.fill(value, _parent=instance, _key=_key or self.attr_name), ) def fill(self, value, _parent, _key=None): _cfgdict = cfgdict() _cfgdict._config_parent = _parent _cfgdict._config_key = _key _cfgdict._config_attr = self _cfgdict._elem_type = self.child_type _cfgdict.update(value or builtins.dict()) return _cfgdict def _set_type(self, type, key=None): self.child_type = super()._set_type(type, key=False) return self.fill def tree(self, instance): val = _getattr(instance, self.attr_name).items() return {k: self.tree_of(v) for k, v in val} def get_hint(self): if self.hint is not MISSING: return self.hint if hasattr(self.child_type, "__hint__"): return { "key1": self.child_type.__hint__(), "key2": self.child_type.__hint__(), } return MISSING class ConfigurationReferenceAttribute(ConfigurationAttribute): def __init__( self, reference, key=None, ref_type=None, populate=None, backref=None, pop_unique=True, **kwargs, ): self.ref_lambda = reference self.ref_key = key self.ref_type = ref_type self.populate = populate self.backref = backref self.pop_unique = pop_unique # No need to cast to any types: the reference we fetch will already have been cast if "type" in kwargs: # pragma: nocover del kwargs["type"] super().__init__(**kwargs) def get_ref_key(self): return self.ref_key or (self.attr_name + "_reference") def __set__(self, instance, value, key=None): if self.is_reference_value(value): _setattr(instance, self.attr_name, value) else: setattr(instance, self.get_ref_key(), value) if self.should_resolve_on_set(instance): if hasattr(instance, "_config_root"): # pragma: nocover raise CfgReferenceError( "Can't autoresolve references without a config root." ) _setattr( instance, self.attr_name, self.__ref__(instance, instance._config_root), ) def is_reference_value(self, value): if value is None: return True if self.ref_type is not None: return isinstance(value, self.ref_type) elif hasattr(self.ref_lambda, "is_ref"): return self.ref_lambda.is_ref(value) else: return not isinstance(value, str) def should_resolve_on_set(self, instance): return ( hasattr(instance, "_config_resolve_on_set") and instance._config_resolve_on_set ) def __ref__(self, instance, root): try: remote, remote_key = self._prepare_self(instance, root) except NoReferenceAttributeSignal: return None return self.resolve_reference(instance, remote, remote_key) def _prepare_self(self, instance, root): instance._config_root = root instance._config_resolve_on_set = True remote = self.ref_lambda(root, instance) local_attr = self.get_ref_key() if not hasattr(instance, local_attr): raise NoReferenceAttributeSignal() return remote, getattr(instance, local_attr) def resolve_reference(self, instance, remote, key): if key not in remote: raise CfgReferenceError( "Reference '{}' of {} does not exist in {}".format( key, self.get_node_name(instance), remote.get_node_name(), ) ) value = remote[key] if self.populate: self.populate_reference(instance, value) if self.backref: self.back_reference(instance, value) return value def populate_reference(self, instance, reference): # Remote descriptors can ask to handle populating itself by implementing a # __populate__ method. Here we check if the method exists and if so defer to it. if (pop_attr := getattr(reference.__class__, self.populate, None)) and ( pop_func := getattr(pop_attr, "__populate__", None) ): pop_func(reference, instance, unique_list=self.pop_unique) elif (population := getattr(reference, self.populate, None)) is not None: if not self.pop_unique or instance not in population: population.append(instance) else: setattr(reference, self.populate, [instance]) def back_reference(self, instance, reference): if (ref_attr := getattr(reference.__class__, self.backref, None)) and ( ref_func := getattr(ref_attr, "__backref__", None) ): ref_func(reference, instance) else: setattr(reference, self.backref, instance) def tree(self, instance): val = getattr(instance, self.get_ref_key(), None) if self.is_reference_value(val) and hasattr(val, "_config_key"): val = val._config_key return val class ConfigurationReferenceListAttribute(ConfigurationReferenceAttribute): def __set__(self, instance, value, key=None): if value is None: setattr(instance, self.get_ref_key(), []) _setattr(instance, self.attr_name, []) return try: remote_keys = builtins.list(iter(value)) except TypeError: raise CfgReferenceError( "Reference list '{}' of {} is not iterable.".format( value, self.get_node_name(instance) ) ) # Store the referring values to the references key. setattr(instance, self.get_ref_key(), remote_keys) if self.should_resolve_on_set(instance): remote = self.ref_lambda(instance._config_root, instance) refs = self.resolve_reference_list(instance, remote, remote_keys) _setattr(instance, self.attr_name, refs) def __get__(self, instance, owner): if instance is None: return self if self.should_resolve_on_set(instance): return super().__get__(instance, owner) else: return getattr(instance, self.get_ref_key()) def get_ref_key(self): return self.ref_key or (self.attr_name + "_references") def __ref__(self, instance, root): try: remote, remote_keys = self._prepare_self(instance, root) except NoReferenceAttributeSignal: # pragma: nocover return None return self.resolve_reference_list(instance, remote, remote_keys) def resolve_reference_list(self, instance, remote, remote_keys): refs = [] for remote_key in remote_keys: if not self.is_reference_value(remote_key): reference = self.resolve_reference(instance, remote, remote_key) else: reference = remote_key # Usually resolve_reference also populates, but since we have our ref # already we skip it and should call populate_reference ourselves. if self.populate: self.populate_reference(instance, reference) refs.append(reference) return refs def __populate__(self, instance, value, unique_list=False): has_refs = hasattr(instance, self.get_ref_key()) has_pop = hasattr(instance, self.attr_name) if has_pop: population = getattr(instance, self.attr_name) if has_refs: references = getattr(instance, self.get_ref_key()) is_new = (not has_pop or value not in population) and ( not has_refs or value not in references ) should_pop = has_pop and (not unique_list or is_new) should_ref = should_pop or not has_pop if should_pop: population.append(value) if should_ref: if not has_refs: setattr(instance, self.get_ref_key(), [value]) else: references.append(value) def tree(self, instance): val = getattr(instance, self.get_ref_key(), []) val = [v._config_key if self._tree_should_unreference(v) else v for v in val] return val def _tree_should_unreference(self, value): return self.is_reference_value(value) and hasattr(value, "_config_key") class ConfigurationAttributeSlot(ConfigurationAttribute): def __set__(self, instance, value): # pragma: nocover raise NotImplementedError( f"Configuration slot '{self.attr_name}' of {instance.get_node_name()} is" f" empty. The {instance.__class__._bsb_entry_point.module_name} plugin" f" provided by '{instance.__class__._bsb_entry_point.dist}' should fill the" " slot with a configuration attribute." ) class ConfigurationProperty(ConfigurationAttribute): def __init__(self, fget, *args, **kwargs): super().__init__(*args, **kwargs) self.fget = fget self.fset = None def setter(self, f): self.fset = f return self def __get__(self, instance, owner): if instance is None: return self return self.fget(instance) def __set__(self, instance, value): if self.fset is None: e = AttributeError(f"Can't set attribute '{self.attr_name}'") e.node = self raise e else: return super().__set__(instance, value) def _collect_kv(n, d, k, v): d[k] = v class ConfigurationAttributeCatcher(ConfigurationAttribute): def __init__( self, *args, type=str, initial=builtins.dict, catch=_collect_kv, contains=None, tree_cb=None, **kwargs, ): super().__init__(*args, type=type, default=initial, call_default=True, **kwargs) self.catch_callback = catch if contains is not None: self.contains = contains if tree_cb is not None: self.tree_callback = tree_cb def __set__(self, instance, value): _setattr(instance, self.attr_name, value) def get_caught(self, instance): if not hasattr(instance, f"_{self.attr_name}_caught"): setattr(instance, f"_{self.attr_name}_caught", {}) return getattr(instance, f"_{self.attr_name}_caught") def __catch__(self, node, key, value): # Try to cast to our type, if it fails it will be caught by whoever is asking us # to catch this and we don't catch this value. cast = self.type(value, _parent=node, _key=key) # If succesfully cast, catch this value by executing our catch callback. self.catch_callback(node, _getattr(node, self.attr_name), key, cast) self.get_caught(node)[key] = cast def tree(self, instance): # The default attr catcher collects what it catches in a dict. When we want to # build the config tree again these values should be placed back in their # original keys. We don't want to store our caught values in the config file. To # do so we use the `tree_callback` instead. return None def contains(self, instance, key): return key in self.get_caught(instance) def tree_callback(self, instance, key): # When building the config tree the values that were caught can't be found in the # attrs and the tree builder will check all catch-attr's `contains` methods and # calls the right tree_callback to fetch the value. value = _getattr(instance, self.attr_name)[key] if hasattr(value, "__tree__"): value = value.__tree__() return value