from controlpi.messagebus import MessageBus
-from typing import Union, Dict, List, Any
+from typing import Union, Dict, List, Any, Optional
JSONSchema = Union[bool, Dict[str, Union[None, str, int, float, bool,
Dict[str, Any], List[Any]]]]
# Could be more specific.
overridden by concrete plugins.
"""
- @property
- @classmethod
- @abstractmethod
- def CONF_SCHEMA(cls) -> JSONSchema:
- """JSON schema for configuration of plugin.
+ CONF_SCHEMA: JSONSchema = False
- Given configurations are validated against this schema in __init__.
- process_conf and run can assume a valid configuration in self.conf.
- """
- raise NotImplementedError
+ _validator: Optional[jsonschema.Draft7Validator] = None
def __init__(self, bus: MessageBus, name: str, conf: PluginConf) -> None:
# noqa: D107
- assert isinstance(bus, MessageBus)
self.bus = bus
- assert isinstance(name, str)
self.name = name
- jsonschema.Draft7Validator.check_schema(type(self).CONF_SCHEMA)
- validator = jsonschema.Draft7Validator(type(self).CONF_SCHEMA)
- assert isinstance(conf, dict)
+ if not type(self)._validator:
+ jsonschema.Draft7Validator.check_schema(type(self).CONF_SCHEMA)
+ type(self)._validator = \
+ jsonschema.Draft7Validator(type(self).CONF_SCHEMA)
+ validator = type(self)._validator
+ assert isinstance(validator, jsonschema.Draft7Validator)
valid = True
for error in validator.iter_errors(conf):
print(error)