From: Benjamin Braatz Date: Mon, 9 Feb 2026 12:04:03 +0000 (+0100) Subject: Linting with ruff and ty. X-Git-Tag: v0.3.9~1 X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=5f7138b3dedb62648769a9beb5d77f9ee7ba6c2a;p=graphit%2Fcontrolpi.git Linting with ruff and ty. --- diff --git a/controlpi/__init__.py b/controlpi/__init__.py index c190534..970e130 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -10,40 +10,41 @@ to run a ControlPi system based on a configuration file indefinitely. The test function is a utility function to test plugins with minimal boilerplate code. """ + import asyncio -import fastjsonschema # type: ignore +import fastjsonschema -from controlpi.messagebus import (MessageBus, BusException, - Message, MessageTemplate) +from controlpi.messagebus import MessageBus, Message, MessageTemplate from controlpi.pluginregistry import PluginRegistry from controlpi.baseplugin import BasePlugin, PluginConf, ConfException from typing import Dict, List, Coroutine, Any -CONF_SCHEMA = {'type': 'object', - 'patternProperties': {'.*': {'type': 'object'}}} +CONF_SCHEMA = {"type": "object", "patternProperties": {".*": {"type": "object"}}} -def _process_conf(message_bus: MessageBus, - conf: Dict[str, PluginConf]) -> List[Coroutine]: +def _process_conf( + message_bus: MessageBus, conf: Dict[str, PluginConf] +) -> List[Coroutine]: try: conf = fastjsonschema.validate(CONF_SCHEMA, conf) except fastjsonschema.JsonSchemaException as e: - print(f"Configuration not valid:\n{e.message}") + print(f"Configuration not valid:\n{e}") return [] - plugins = PluginRegistry('controlpi_plugins', BasePlugin) + plugins = PluginRegistry("controlpi_plugins", BasePlugin) coroutines = [message_bus.run()] for instance_name in conf: instance_conf = conf[instance_name] - if 'plugin' not in instance_conf: - print("No plugin implementation specified for instance" - f" '{instance_name}'.") + if "plugin" not in instance_conf: + print(f"No plugin implementation specified for instance '{instance_name}'.") continue - plugin_name = instance_conf['plugin'] + plugin_name = instance_conf["plugin"] if plugin_name not in plugins: - print(f"No implementation found for plugin '{plugin_name}'" - f" (specified for instance '{instance_name}').") + print( + f"No implementation found for plugin '{plugin_name}'" + f" (specified for instance '{instance_name}')." + ) continue plugin = plugins[plugin_name] try: @@ -95,9 +96,9 @@ async def run(conf: Dict[str, PluginConf]) -> None: pass -async def test(conf: Dict[str, PluginConf], - messages: List[Dict[str, Any]], - wait: float = 0.0) -> None: +async def test( + conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0 +) -> None: """Test configuration of ControlPi system. Setup message bus, process given configuration, run message bus and @@ -147,14 +148,21 @@ async def test(conf: Dict[str, PluginConf], message_bus = MessageBus() async def log(message): - if ('sender' in message and message['sender'] == '' and - 'event' in message and message['event'] == 'registered' and - 'client' in message and message['client'] == 'test()'): + if ( + "sender" in message + and message["sender"] == "" + and "event" in message + and message["event"] == "registered" + and "client" in message + and message["client"] == "test()" + ): # Do not log own registration of 'test()': return print(f"test(): {message}") - message_bus.register('test()', 'Test', - [MessageTemplate()], [([MessageTemplate()], log)]) + + message_bus.register( + "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)] + ) coroutines = _process_conf(message_bus, conf) background_tasks = set() @@ -165,7 +173,7 @@ async def test(conf: Dict[str, PluginConf], # Give the created task opportunity to run: await asyncio.sleep(0) for message in messages: - await message_bus.send(Message('test()', message)) + await message_bus.send(Message("test()", message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) await asyncio.sleep(wait) diff --git a/controlpi/__main__.py b/controlpi/__main__.py index f880312..7f0b7f5 100644 --- a/controlpi/__main__.py +++ b/controlpi/__main__.py @@ -4,12 +4,13 @@ The main coroutine configures and runs the system. The system can be started by: python -m controlpi """ + import signal import sys import os import json import asyncio -import pyinotify # type: ignore +import pyinotify from controlpi import run, PluginConf @@ -27,8 +28,7 @@ async def add_signal_handlers() -> None: """Add signal handlers to the running loop.""" loop = asyncio.get_running_loop() for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: - loop.add_signal_handler(sig, - lambda s=sig: asyncio.create_task(shutdown(s))) + loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s))) def read_configuration() -> Dict[str, PluginConf]: @@ -61,8 +61,7 @@ async def add_config_change_handler() -> pyinotify.AsyncioNotifier: """Add handler for configuration file.""" wm = pyinotify.WatchManager() loop = asyncio.get_running_loop() - notifier = pyinotify.AsyncioNotifier(wm, loop, - default_proc_fun=ConfigHandler()) + notifier = pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=ConfigHandler()) wm.add_watch(os.path.dirname(sys.argv[1]), pyinotify.ALL_EVENTS) return notifier @@ -75,5 +74,6 @@ async def main() -> None: await run(conf) notifier.stop() -if __name__ == '__main__': + +if __name__ == "__main__": asyncio.run(main()) diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py index ce1b393..006c267 100644 --- a/controlpi/baseplugin.py +++ b/controlpi/baseplugin.py @@ -25,6 +25,7 @@ plugins: Plugins are configured and run based on the information in the global configuration. Here, we test this manually: +>>> import asyncio >>> async def test(): ... p = TestPlugin(MessageBus(), 'Test Instance', {'key': 'Something'}) ... await p.run() @@ -83,17 +84,19 @@ might register separate clients for all devices connected to the bus, or a network socket plugin might register separate clients for all connections to the socket (and unregister them when the connection is closed). """ -__pdoc__ = {'BasePlugin.CONF_SCHEMA': False} + +__pdoc__ = {"BasePlugin.CONF_SCHEMA": False} from abc import ABC, abstractmethod -import asyncio -import fastjsonschema # type: ignore +import fastjsonschema from controlpi.messagebus import MessageBus from typing import Union, Dict, List, Any, Optional, Callable -JSONSchema = Union[bool, Dict[str, Union[None, str, int, float, bool, - Dict[str, Any], List[Any]]]] + +JSONSchema = Union[ + bool, Dict[str, Union[None, str, int, float, bool, Dict[str, Any], List[Any]]] +] # Could be more specific. PluginConf = Dict[str, Any] # Could be more specific. @@ -117,6 +120,7 @@ class BasePlugin(ABC): Initialisation sets the instance variables bus to the given message bus, name to the given name, and conf to the given configuration: + >>> import asyncio >>> class TestPlugin(BasePlugin): ... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}}, ... 'required': ['key']} @@ -169,17 +173,15 @@ class BasePlugin(ABC): self.bus = bus self.name = name if not type(self)._validate: - type(self)._validate = \ - fastjsonschema.compile(type(self).CONF_SCHEMA) + type(self)._validate = fastjsonschema.compile(type(self).CONF_SCHEMA) self.conf = {} validate = type(self)._validate assert validate is not None try: self.conf = validate(conf) except fastjsonschema.JsonSchemaException as e: - print(e.message) - raise ConfException(f"Configuration for '{self.name}'" - " is not valid.") + print(e) + raise ConfException(f"Configuration for '{self.name}' is not valid.") self.process_conf() @abstractmethod diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index 9b502cb..188f8de 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -85,13 +85,24 @@ Logger: {'sender': 'Client 1', 'k1': 'Test'} Logger: {'sender': '', 'target': 'Client 1'} Client 1: {'sender': '', 'target': 'Client 1'} """ + import asyncio import json -import fastjsonschema # type: ignore +import fastjsonschema import sys -from typing import (Union, Dict, List, Any, Callable, Coroutine, - Optional, Iterable, Tuple) +from typing import ( + Union, + Dict, + List, + Any, + Callable, + Coroutine, + Optional, + Iterable, + Tuple, +) + MessageValue = Union[None, str, int, float, bool, Dict[str, Any], List[Any]] # Should really be: # MessageValue = Union[None, str, int, float, bool, @@ -100,7 +111,7 @@ MessageValue = Union[None, str, int, float, bool, Dict[str, Any], List[Any]] # https://github.com/python/mypy/issues/731 JSONSchema = Union[bool, Dict[str, MessageValue]] # Could be even more specific. -MessageCallback = Callable[['Message'], Coroutine[Any, Any, None]] +MessageCallback = Callable[["Message"], Coroutine[Any, Any, None]] # Global cache of JSON schema validation functions: @@ -176,8 +187,9 @@ class Message(Dict[str, MessageValue]): {'sender': 'New sender', 'key': 'value'} """ - def __init__(self, sender: str, - init: Optional[Dict[str, MessageValue]] = None) -> None: + def __init__( + self, sender: str, init: Optional[Dict[str, MessageValue]] = None + ) -> None: """Initialise message. Message is initialised with given sender and possibly given @@ -190,12 +202,11 @@ class Message(Dict[str, MessageValue]): {'sender': 'Example sender', 'key 1': 'value 1'} """ if not isinstance(sender, str): - raise TypeError(f"'{sender}' is not a valid sender name" - " (not a string).") - self['sender'] = '' + raise TypeError(f"'{sender}' is not a valid sender name (not a string).") + self["sender"] = "" if init is not None: self.update(init) - self['sender'] = sender + self["sender"] = sender @staticmethod def check_value(value: MessageValue) -> bool: @@ -250,8 +261,12 @@ class Message(Dict[str, MessageValue]): """ if value is None: return True - elif (isinstance(value, str) or isinstance(value, int) or - isinstance(value, float) or isinstance(value, bool)): + elif ( + isinstance(value, str) + or isinstance(value, int) + or isinstance(value, float) + or isinstance(value, bool) + ): return True elif isinstance(value, dict): for key in value: @@ -287,8 +302,7 @@ class Message(Dict[str, MessageValue]): TypeError: '1j' is not a valid value in Message. """ if not isinstance(key, str): - raise TypeError(f"'{key}' is not a valid key in Message" - " (not a string).") + raise TypeError(f"'{key}' is not a valid key in Message (not a string).") if not self.check_value(value): raise TypeError(f"'{value}' is not a valid value in Message.") super().__setitem__(key, value) @@ -324,8 +338,7 @@ class Message(Dict[str, MessageValue]): """ if args: if len(args) > 1: - raise TypeError("update expected at most 1 argument," - f" got {len(args)}") + raise TypeError(f"update expected at most 1 argument, got {len(args)}") other = dict(args[0]) for key in other: self[key] = other[key] @@ -398,7 +411,7 @@ class MessageTemplate(Dict[str, JSONSchema]): self.update(init) @staticmethod - def from_message(message: Message) -> 'MessageTemplate': + def from_message(message: Message) -> "MessageTemplate": """Create template from message. Template witch constant schemas is created from message: @@ -424,25 +437,31 @@ class MessageTemplate(Dict[str, JSONSchema]): and the template for the registration can be constructed by this method. """ + def schema_from_value(value: MessageValue) -> JSONSchema: schema: JSONSchema = False if value is None: - schema = {'const': None} - elif (isinstance(value, str) or isinstance(value, int) or - isinstance(value, float) or isinstance(value, bool)): - schema = {'const': value} + schema = {"const": None} + elif ( + isinstance(value, str) + or isinstance(value, int) + or isinstance(value, float) + or isinstance(value, bool) + ): + schema = {"const": value} elif isinstance(value, dict): properties = {} for inner_key in value: inner_value: Message = value[inner_key] properties[inner_key] = schema_from_value(inner_value) - schema = {'type': 'object', - 'properties': properties} + schema = {"type": "object", "properties": properties} elif isinstance(value, list): - schema = {'type': 'array', - 'items': [schema_from_value(element) - for element in value]} + schema = { + "type": "array", + "items": [schema_from_value(element) for element in value], + } return schema + template = MessageTemplate() for key in message: template[key] = schema_from_value(message[key]) @@ -474,11 +493,14 @@ class MessageTemplate(Dict[str, JSONSchema]): >>> t['key'] = True """ if not isinstance(key, str): - raise TypeError(f"'{key}' is not a valid key in MessageTemplate" - " (not a string).") + raise TypeError( + f"'{key}' is not a valid key in MessageTemplate (not a string)." + ) if not register_schema(value): - raise TypeError(f"'{value}' is not a valid value in" - " MessageTemplate (not a valid JSON schema).") + raise TypeError( + f"'{value}' is not a valid value in" + " MessageTemplate (not a valid JSON schema)." + ) super().__setitem__(key, value) def update(self, *args, **kwargs) -> None: @@ -532,16 +554,14 @@ class MessageTemplate(Dict[str, JSONSchema]): """ if args: if len(args) > 1: - raise TypeError("update expected at most 1 argument," - f" got {len(args)}") + raise TypeError(f"update expected at most 1 argument, got {len(args)}") other = dict(args[0]) for key in other: self[key] = other[key] for key in kwargs: self[key] = kwargs[key] - def setdefault(self, key: str, - value: Optional[JSONSchema] = None) -> JSONSchema: + def setdefault(self, key: str, value: Optional[JSONSchema] = None) -> JSONSchema: """Override setdefault to use validity checks. >>> t = MessageTemplate() @@ -777,8 +797,12 @@ class TemplateRegistry: # First key is the message key, second key is the JSON schema string self._templates: Dict[str, List[MessageTemplate]] = {} - def insert(self, template: MessageTemplate, client: str, - callback: Optional[MessageCallback] = None) -> None: + def insert( + self, + template: MessageTemplate, + client: str, + callback: Optional[MessageCallback] = None, + ) -> None: """Register a client for a template. >>> r = TemplateRegistry() @@ -799,26 +823,30 @@ class TemplateRegistry: self._callbacks[client].append(callback) else: key, schema = next(iter(template.items())) - reduced_template = MessageTemplate({k: template[k] - for k in template - if k != key}) - if (isinstance(schema, dict) and len(schema) == 1 and - 'const' in schema and isinstance(schema['const'], str)): - value = schema['const'] + reduced_template = MessageTemplate( + {k: template[k] for k in template if k != key} + ) + if ( + isinstance(schema, dict) + and len(schema) == 1 + and "const" in schema + and isinstance(schema["const"], str) + ): + value = schema["const"] if key not in self._constants: self._constants[key] = {} if value not in self._constants[key]: self._constants[key][value] = TemplateRegistry() - self._constants[key][value].insert(reduced_template, - client, callback) + self._constants[key][value].insert(reduced_template, client, callback) else: schema_string = json.dumps(schema) if key not in self._schemas: self._schemas[key] = {} if schema_string not in self._schemas[key]: self._schemas[key][schema_string] = TemplateRegistry() - self._schemas[key][schema_string].insert(reduced_template, - client, callback) + self._schemas[key][schema_string].insert( + reduced_template, client, callback + ) def delete(self, client: str) -> bool: """Unregister a client from all templates. @@ -857,8 +885,7 @@ class TemplateRegistry: if not new_schemas[key]: del new_schemas[key] self._schemas = new_schemas - if (self._clients or self._callbacks or - self._constants or self._schemas): + if self._clients or self._callbacks or self._constants or self._schemas: return True return False @@ -886,8 +913,11 @@ class TemplateRegistry: if client in self._clients: return True for key in self._constants: - if (key in message and isinstance(message[key], str) and - message[key] in self._constants[key]): + if ( + key in message + and isinstance(message[key], str) + and message[key] in self._constants[key] + ): value = message[key] assert isinstance(value, str) child = self._constants[key][value] @@ -921,8 +951,11 @@ class TemplateRegistry: if client not in result: result.append(client) for key in self._constants: - if (key in message and isinstance(message[key], str) and - message[key] in self._constants[key]): + if ( + key in message + and isinstance(message[key], str) + and message[key] in self._constants[key] + ): value = message[key] assert isinstance(value, str) child = self._constants[key][value] @@ -947,8 +980,11 @@ class TemplateRegistry: if callback not in result: result.append(callback) for key in self._constants: - if (key in message and isinstance(message[key], str) and - message[key] in self._constants[key]): + if ( + key in message + and isinstance(message[key], str) + and message[key] in self._constants[key] + ): value = message[key] assert isinstance(value, str) child = self._constants[key][value] @@ -1120,10 +1156,13 @@ class MessageBus: self._send_reg: TemplateRegistry = TemplateRegistry() self._recv_reg: TemplateRegistry = TemplateRegistry() - def register(self, client: str, plugin: str, - sends: Iterable[MessageTemplate], - receives: Iterable[Tuple[Iterable[MessageTemplate], - MessageCallback]]) -> None: + def register( + self, + client: str, + plugin: str, + sends: Iterable[MessageTemplate], + receives: Iterable[Tuple[Iterable[MessageTemplate], MessageCallback]], + ) -> None: """Register a client at the message bus. >>> async def callback(message): @@ -1152,20 +1191,19 @@ class MessageBus: if not client: raise BusException("Client name is not allowed to be empty.") if client in self._plugins: - raise BusException(f"Client '{client}' already registered" - " at message bus.") - event = Message('') - event['event'] = 'registered' - event['client'] = client + raise BusException(f"Client '{client}' already registered at message bus.") + event = Message("") + event["event"] = "registered" + event["client"] = client self._plugins[client] = plugin - event['plugin'] = plugin + event["plugin"] = plugin for template in sends: self._send_reg.insert(template, client) - event['sends'] = self._send_reg.get_templates(client) - for (templates, callback) in receives: + event["sends"] = self._send_reg.get_templates(client) + for templates, callback in receives: for template in templates: self._recv_reg.insert(template, client, callback) - event['receives'] = self._recv_reg.get_templates(client) + event["receives"] = self._recv_reg.get_templates(client) self._queue.put_nowait(event) def unregister(self, client: str) -> None: @@ -1185,9 +1223,9 @@ class MessageBus: """ if client not in self._plugins: return - event = Message('') - event['event'] = 'unregistered' - event['client'] = client + event = Message("") + event["event"] = "unregistered" + event["client"] = client del self._plugins[client] self._send_reg.delete(client) self._recv_reg.delete(client) @@ -1209,37 +1247,32 @@ class MessageBus: background_tasks = set() while True: message = await self._queue.get() - if ('target' in message and - message['target'] == '' and - 'command' in message): - if message['command'] == 'get clients': + if "target" in message and message["target"] == "" and "command" in message: + if message["command"] == "get clients": for client in self._plugins: - answer = Message('') - answer['client'] = client - answer['plugin'] = self._plugins[client] - answer['sends'] = (self._send_reg - .get_templates(client)) - answer['receives'] = (self._recv_reg - .get_templates(client)) + answer = Message("") + answer["client"] = client + answer["plugin"] = self._plugins[client] + answer["sends"] = self._send_reg.get_templates(client) + answer["receives"] = self._recv_reg.get_templates(client) await self._queue.put(answer) - elif message['command'] == 'push conf': + elif message["command"] == "push conf": conf = {} try: with open(sys.argv[1]) as conf_file: conf = json.load(conf_file) - except (IndexError, FileNotFoundError, - json.decoder.JSONDecodeError): + except ( + IndexError, + FileNotFoundError, + json.decoder.JSONDecodeError, + ): pass - if conf == message['conf']: - await (self._queue - .put(Message('', - {'event': 'conf unchanged'}))) + if conf == message["conf"]: + await self._queue.put(Message("", {"event": "conf unchanged"})) else: - await (self._queue - .put(Message('', - {'event': 'conf changed'}))) - with open(sys.argv[1], 'w') as conf_file: - json.dump(message['conf'], conf_file) + await self._queue.put(Message("", {"event": "conf changed"})) + with open(sys.argv[1], "w") as conf_file: + json.dump(message["conf"], conf_file) for callback in self._recv_reg.get_callbacks(message): task = asyncio.create_task(callback(message)) background_tasks.add(task) @@ -1284,12 +1317,13 @@ class MessageBus: Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'} Got: {'sender': 'Client 2', 'target': 'Client 1'} """ - assert isinstance(message['sender'], str) - sender = message['sender'] + assert isinstance(message["sender"], str) + sender = message["sender"] if sender: if not self._send_reg.check(sender, message): - raise BusException(f"Message '{message}' not allowed for" - f" sender '{sender}'.") + raise BusException( + f"Message '{message}' not allowed for sender '{sender}'." + ) await self._queue.put(message) def send_nowait(self, message: Message) -> None: @@ -1330,10 +1364,11 @@ class MessageBus: Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'} Got: {'sender': 'Client 2', 'target': 'Client 1'} """ - assert isinstance(message['sender'], str) - sender = message['sender'] + assert isinstance(message["sender"], str) + sender = message["sender"] if sender: if not self._send_reg.check(sender, message): - raise BusException(f"Message '{message}' not allowed for" - f" sender '{sender}'.") + raise BusException( + f"Message '{message}' not allowed for sender '{sender}'." + ) self._queue.put_nowait(message) diff --git a/controlpi/pluginregistry.py b/controlpi/pluginregistry.py index 6fc35cb..076017a 100644 --- a/controlpi/pluginregistry.py +++ b/controlpi/pluginregistry.py @@ -34,6 +34,7 @@ Plugin2: >>> p1 = registry['Plugin1'] >>> i1 = p1() """ + import importlib import pkgutil import collections.abc @@ -86,7 +87,7 @@ class PluginRegistry(collections.abc.Mapping): Plugin2: """ ns_mod = importlib.import_module(namespace_package) - ns_path = ns_mod.__path__ # type: ignore # mypy issue #1422 + ns_path = ns_mod.__path__ ns_name = ns_mod.__name__ for _, mod_name, _ in pkgutil.iter_modules(ns_path): importlib.import_module(f"{ns_name}.{mod_name}") @@ -97,8 +98,8 @@ class PluginRegistry(collections.abc.Mapping): result.append(subcls) result.extend(all_subclasses(subcls)) return result - self._plugins = {cls.__name__: cls - for cls in all_subclasses(base_class)} + + self._plugins = {cls.__name__: cls for cls in all_subclasses(base_class)} def __len__(self) -> int: """Get number of registered plugins. diff --git a/controlpi_plugins/state.py b/controlpi_plugins/state.py index 23328ae..f43f1fb 100644 --- a/controlpi_plugins/state.py +++ b/controlpi_plugins/state.py @@ -79,6 +79,7 @@ test(): {'sender': 'Test State', 'event': 'changed', 'state': False} test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} test(): {'sender': 'Test State 4', 'event': 'changed', 'state': True} """ + from controlpi import BasePlugin, Message, MessageTemplate from typing import Dict, List @@ -125,36 +126,54 @@ class State(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self.state: bool = False - self.bus.register(self.name, 'State', - [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'set state'}, - 'new state': - {'type': 'boolean'}})], - self._set_state)]) + self.bus.register( + self.name, + "State", + [ + MessageTemplate( + {"event": {"const": "changed"}, "state": {"type": "boolean"}} + ), + MessageTemplate({"state": {"type": "boolean"}}), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "set state"}, + "new state": {"type": "boolean"}, + } + ) + ], + self._set_state, + ), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'state': self.state})) + await self.bus.send(Message(self.name, {"state": self.state})) async def _set_state(self, message: Message) -> None: - if self.state != message['new state']: - assert isinstance(message['new state'], bool) - self.state = message['new state'] - await self.bus.send(Message(self.name, - {'event': 'changed', - 'state': self.state})) + if self.state != message["new state"]: + assert isinstance(message["new state"], bool) + self.state = message["new state"] + await self.bus.send( + Message(self.name, {"event": "changed", "state": self.state}) + ) else: - await self.bus.send(Message(self.name, - {'state': self.state})) + await self.bus.send(Message(self.name, {"state": self.state})) async def run(self) -> None: """Run no code proactively.""" @@ -204,8 +223,10 @@ class StateAlias(BasePlugin): test(): {'sender': 'Test StateAlias', 'state': True} """ - CONF_SCHEMA = {'properties': {'alias for': {'type': 'string'}}, - 'required': ['alias for']} + CONF_SCHEMA = { + "properties": {"alias for": {"type": "string"}}, + "required": ["alias for"], + } """Schema for StateAlias plugin configuration. Required configuration key: @@ -215,55 +236,90 @@ class StateAlias(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'StateAlias', - [MessageTemplate({'target': - {'const': self.conf['alias for']}, - 'command': - {'const': 'get state'}}), - MessageTemplate({'target': - {'const': self.conf['alias for']}, - 'command': - {'const': 'set state'}, - 'new state': - {'type': 'boolean'}}), - MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'set state'}, - 'new state': - {'type': 'boolean'}})], - self._set_state), - ([MessageTemplate({'sender': - {'const': - self.conf['alias for']}, - 'state': - {'type': 'boolean'}})], - self._translate)]) + self.bus.register( + self.name, + "StateAlias", + [ + MessageTemplate( + { + "target": {"const": self.conf["alias for"]}, + "command": {"const": "get state"}, + } + ), + MessageTemplate( + { + "target": {"const": self.conf["alias for"]}, + "command": {"const": "set state"}, + "new state": {"type": "boolean"}, + } + ), + MessageTemplate( + {"event": {"const": "changed"}, "state": {"type": "boolean"}} + ), + MessageTemplate({"state": {"type": "boolean"}}), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "set state"}, + "new state": {"type": "boolean"}, + } + ) + ], + self._set_state, + ), + ( + [ + MessageTemplate( + { + "sender": {"const": self.conf["alias for"]}, + "state": {"type": "boolean"}, + } + ) + ], + self._translate, + ), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, - {'target': self.conf['alias for'], - 'command': 'get state'})) + await self.bus.send( + Message( + self.name, {"target": self.conf["alias for"], "command": "get state"} + ) + ) async def _set_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, - {'target': self.conf['alias for'], - 'command': 'set state', - 'new state': message['new state']})) + await self.bus.send( + Message( + self.name, + { + "target": self.conf["alias for"], + "command": "set state", + "new state": message["new state"], + }, + ) + ) async def _translate(self, message: Message) -> None: alias_message = Message(self.name) - if 'event' in message and message['event'] == 'changed': - alias_message['event'] = 'changed' - alias_message['state'] = message['state'] + if "event" in message and message["event"] == "changed": + alias_message["event"] = "changed" + alias_message["state"] = message["state"] await self.bus.send(alias_message) async def run(self) -> None: @@ -315,9 +371,10 @@ class AndState(BasePlugin): 'states': ['Test State 1', 'Test State 2']} """ - CONF_SCHEMA = {'properties': {'states': {'type': 'array', - 'items': {'type': 'string'}}}, - 'required': ['states']} + CONF_SCHEMA = { + "properties": {"states": {"type": "array", "items": {"type": "string"}}}, + "required": ["states"], + } """Schema for AndState plugin configuration. Required configuration key: @@ -329,48 +386,70 @@ class AndState(BasePlugin): """Register plugin as bus client.""" updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} - for state in self.conf['states']: - updates.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + for state in self.conf["states"]: + updates.append( + MessageTemplate( + {"sender": {"const": state}, "state": {"type": "boolean"}} + ) + ) self.states[state] = False self.state: bool = all(self.states.values()) - self.bus.register(self.name, 'AndState', - [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}}), - MessageTemplate({'states': {'type': 'array', - 'items': { - 'type': 'string' - }}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get sources'}})], - self._get_sources), - (updates, self._update)]) + self.bus.register( + self.name, + "AndState", + [ + MessageTemplate( + {"event": {"const": "changed"}, "state": {"type": "boolean"}} + ), + MessageTemplate({"state": {"type": "boolean"}}), + MessageTemplate( + {"states": {"type": "array", "items": {"type": "string"}}} + ), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get sources"}, + } + ) + ], + self._get_sources, + ), + (updates, self._update), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'state': self.state})) + await self.bus.send(Message(self.name, {"state": self.state})) async def _get_sources(self, message: Message) -> None: source_states = list(self.states.keys()) - await self.bus.send(Message(self.name, {'states': source_states})) + await self.bus.send(Message(self.name, {"states": source_states})) async def _update(self, message: Message) -> None: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] + assert isinstance(message["sender"], str) + assert isinstance(message["state"], bool) + self.states[message["sender"]] = message["state"] new_state = all(self.states.values()) if self.state != new_state: self.state = new_state - await self.bus.send(Message(self.name, - {'event': 'changed', - 'state': self.state})) + await self.bus.send( + Message(self.name, {"event": "changed", "state": self.state}) + ) async def run(self) -> None: """Run no code proactively.""" @@ -420,9 +499,10 @@ class OrState(BasePlugin): 'states': ['Test State 1', 'Test State 2']} """ - CONF_SCHEMA = {'properties': {'states': {'type': 'array', - 'items': {'type': 'string'}}}, - 'required': ['states']} + CONF_SCHEMA = { + "properties": {"states": {"type": "array", "items": {"type": "string"}}}, + "required": ["states"], + } """Schema for OrState plugin configuration. Required configuration key: @@ -434,48 +514,70 @@ class OrState(BasePlugin): """Register plugin as bus client.""" updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} - for state in self.conf['states']: - updates.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + for state in self.conf["states"]: + updates.append( + MessageTemplate( + {"sender": {"const": state}, "state": {"type": "boolean"}} + ) + ) self.states[state] = False self.state: bool = any(self.states.values()) - self.bus.register(self.name, 'OrState', - [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}}), - MessageTemplate({'states': {'type': 'array', - 'items': { - 'type': 'string' - }}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get sources'}})], - self._get_sources), - (updates, self._update)]) + self.bus.register( + self.name, + "OrState", + [ + MessageTemplate( + {"event": {"const": "changed"}, "state": {"type": "boolean"}} + ), + MessageTemplate({"state": {"type": "boolean"}}), + MessageTemplate( + {"states": {"type": "array", "items": {"type": "string"}}} + ), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get sources"}, + } + ) + ], + self._get_sources, + ), + (updates, self._update), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'state': self.state})) + await self.bus.send(Message(self.name, {"state": self.state})) async def _get_sources(self, message: Message) -> None: source_states = list(self.states.keys()) - await self.bus.send(Message(self.name, {'states': source_states})) + await self.bus.send(Message(self.name, {"states": source_states})) async def _update(self, message: Message) -> None: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] + assert isinstance(message["sender"], str) + assert isinstance(message["state"], bool) + self.states[message["sender"]] = message["state"] new_state = any(self.states.values()) if self.state != new_state: self.state = new_state - await self.bus.send(Message(self.name, - {'event': 'changed', - 'state': self.state})) + await self.bus.send( + Message(self.name, {"event": "changed", "state": self.state}) + ) async def run(self) -> None: """Run no code proactively.""" @@ -539,11 +641,13 @@ class AndSet(BasePlugin): test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False} """ - CONF_SCHEMA = {'properties': {'input states': {'type': 'array', - 'items': {'type': - 'string'}}, - 'output state': {'type': 'string'}}, - 'required': ['input states', 'output state']} + CONF_SCHEMA = { + "properties": { + "input states": {"type": "array", "items": {"type": "string"}}, + "output state": {"type": "string"}, + }, + "required": ["input states", "output state"], + } """Schema for AndSet plugin configuration. Required configuration keys: @@ -556,57 +660,89 @@ class AndSet(BasePlugin): """Register plugin as bus client.""" updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} - for state in self.conf['input states']: - updates.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + for state in self.conf["input states"]: + updates.append( + MessageTemplate( + {"sender": {"const": state}, "state": {"type": "boolean"}} + ) + ) self.states[state] = False self.state: bool = all(self.states.values()) - self.bus.register(self.name, 'AndSet', - [MessageTemplate({'target': - {'const': - self.conf['output state']}, - 'command': - {'const': 'set state'}, - 'new state': - {'type': 'boolean'}}), - MessageTemplate({'states': {'type': 'array', - 'items': { - 'type': 'string' - }}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get sources'}})], - self._get_sources), - (updates, self._update)]) + self.bus.register( + self.name, + "AndSet", + [ + MessageTemplate( + { + "target": {"const": self.conf["output state"]}, + "command": {"const": "set state"}, + "new state": {"type": "boolean"}, + } + ), + MessageTemplate( + {"states": {"type": "array", "items": {"type": "string"}}} + ), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get sources"}, + } + ) + ], + self._get_sources, + ), + (updates, self._update), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, - {'target': self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) + await self.bus.send( + Message( + self.name, + { + "target": self.conf["output state"], + "command": "set state", + "new state": self.state, + }, + ) + ) async def _get_sources(self, message: Message) -> None: source_states = list(self.states.keys()) - await self.bus.send(Message(self.name, {'states': source_states})) + await self.bus.send(Message(self.name, {"states": source_states})) async def _update(self, message: Message) -> None: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] + assert isinstance(message["sender"], str) + assert isinstance(message["state"], bool) + self.states[message["sender"]] = message["state"] new_state = all(self.states.values()) if self.state != new_state: self.state = new_state - await self.bus.send(Message(self.name, - {'target': - self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) + await self.bus.send( + Message( + self.name, + { + "target": self.conf["output state"], + "command": "set state", + "new state": self.state, + }, + ) + ) async def run(self) -> None: """Run no code proactively.""" @@ -661,11 +797,13 @@ class OrSet(BasePlugin): 'states': ['Test State 1', 'Test State 2']} """ - CONF_SCHEMA = {'properties': {'input states': {'type': 'array', - 'items': {'type': - 'string'}}, - 'output state': {'type': 'string'}}, - 'required': ['input states', 'output state']} + CONF_SCHEMA = { + "properties": { + "input states": {"type": "array", "items": {"type": "string"}}, + "output state": {"type": "string"}, + }, + "required": ["input states", "output state"], + } """Schema for OrSet plugin configuration. Required configuration keys: @@ -678,57 +816,89 @@ class OrSet(BasePlugin): """Register plugin as bus client.""" updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} - for state in self.conf['input states']: - updates.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + for state in self.conf["input states"]: + updates.append( + MessageTemplate( + {"sender": {"const": state}, "state": {"type": "boolean"}} + ) + ) self.states[state] = False self.state: bool = any(self.states.values()) - self.bus.register(self.name, 'AndSet', - [MessageTemplate({'target': - {'const': - self.conf['output state']}, - 'command': - {'const': 'set state'}, - 'new state': - {'type': 'boolean'}}), - MessageTemplate({'states': {'type': 'array', - 'items': { - 'type': 'string' - }}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get state'}})], - self._get_state), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get sources'}})], - self._get_sources), - (updates, self._update)]) + self.bus.register( + self.name, + "AndSet", + [ + MessageTemplate( + { + "target": {"const": self.conf["output state"]}, + "command": {"const": "set state"}, + "new state": {"type": "boolean"}, + } + ), + MessageTemplate( + {"states": {"type": "array", "items": {"type": "string"}}} + ), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get state"}, + } + ) + ], + self._get_state, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get sources"}, + } + ) + ], + self._get_sources, + ), + (updates, self._update), + ], + ) async def _get_state(self, message: Message) -> None: - await self.bus.send(Message(self.name, - {'target': self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) + await self.bus.send( + Message( + self.name, + { + "target": self.conf["output state"], + "command": "set state", + "new state": self.state, + }, + ) + ) async def _get_sources(self, message: Message) -> None: source_states = list(self.states.keys()) - await self.bus.send(Message(self.name, {'states': source_states})) + await self.bus.send(Message(self.name, {"states": source_states})) async def _update(self, message: Message) -> None: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] + assert isinstance(message["sender"], str) + assert isinstance(message["state"], bool) + self.states[message["sender"]] = message["state"] new_state = any(self.states.values()) if self.state != new_state: self.state = new_state - await self.bus.send(Message(self.name, - {'target': - self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) + await self.bus.send( + Message( + self.name, + { + "target": self.conf["output state"], + "command": "set state", + "new state": self.state, + }, + ) + ) async def run(self) -> None: """Run no code proactively.""" diff --git a/controlpi_plugins/util.py b/controlpi_plugins/util.py index b0078c9..4390ee0 100644 --- a/controlpi_plugins/util.py +++ b/controlpi_plugins/util.py @@ -37,6 +37,7 @@ test(): {'sender': 'Test Alias', 'id': 'translated', Test Log: {'sender': 'Test Alias', 'id': 'translated', 'content': 'Test Message'} """ + import asyncio from datetime import datetime @@ -87,9 +88,10 @@ class Log(BasePlugin): Configuration for 'Test Log' is not valid. """ - CONF_SCHEMA = {'properties': {'filter': {'type': 'array', - 'items': {'type': 'object'}}}, - 'required': ['filter']} + CONF_SCHEMA = { + "properties": {"filter": {"type": "array", "items": {"type": "object"}}}, + "required": ["filter"], + } """Schema for Log plugin configuration. Required configuration key: @@ -99,9 +101,7 @@ class Log(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'Log', - [], - [(self.conf['filter'], self._log)]) + self.bus.register(self.name, "Log", [], [(self.conf["filter"], self._log)]) async def _log(self, message: Message) -> None: print(f"{self.name}: {message}") @@ -159,9 +159,10 @@ class Init(BasePlugin): Configuration for 'Test Init' is not valid. """ - CONF_SCHEMA = {'properties': {'messages': {'type': 'array', - 'items': {'type': 'object'}}}, - 'required': ['messages']} + CONF_SCHEMA = { + "properties": {"messages": {"type": "array", "items": {"type": "object"}}}, + "required": ["messages"], + } """Schema for Init plugin configuration. Required configuration key: @@ -171,24 +172,37 @@ class Init(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'Init', - [MessageTemplate.from_message(message) - for message in self.conf['messages']], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'execute'}})], - self._execute)]) + self.bus.register( + self.name, + "Init", + [ + MessageTemplate.from_message(message) + for message in self.conf["messages"] + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "execute"}, + } + ) + ], + self._execute, + ) + ], + ) async def _execute(self, message: Message) -> None: - for message in self.conf['messages']: + for message in self.conf["messages"]: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) async def run(self) -> None: """Send configured messages on startup.""" - for message in self.conf['messages']: + for message in self.conf["messages"]: await self.bus.send(Message(self.name, message)) @@ -240,26 +254,43 @@ class Execute(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self.messages: List[Message] = [] - self.bus.register(self.name, 'Execute', - [MessageTemplate()], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'set messages'}, - 'messages': - {'type': 'array', - 'items': - {'type': 'object'}}})], - self._set_messages), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'execute'}})], - self._execute)]) + self.bus.register( + self.name, + "Execute", + [MessageTemplate()], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "set messages"}, + "messages": { + "type": "array", + "items": {"type": "object"}, + }, + } + ) + ], + self._set_messages, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "execute"}, + } + ) + ], + self._execute, + ), + ], + ) async def _set_messages(self, message: Message) -> None: - assert isinstance(message['messages'], list) - self.messages = list(message['messages']) + assert isinstance(message["messages"], list) + self.messages = list(message["messages"]) async def _execute(self, message: Message) -> None: for message in self.messages: @@ -355,20 +386,28 @@ class Alias(BasePlugin): Configuration for 'Test Alias' is not valid. """ - CONF_SCHEMA = {'properties': {'from': {'type': 'object'}, - 'to': {'anyOf': - [{'type': 'object'}, - {'type': 'array', - 'items': {'type': 'object'}}]}, - 'translate': {'type': 'array', - 'items': - {'type': 'object', - 'properties': - {'from': - {'type': 'string'}, - 'to': - {'type': 'string'}}}}}, - 'required': ['from']} + CONF_SCHEMA = { + "properties": { + "from": {"type": "object"}, + "to": { + "anyOf": [ + {"type": "object"}, + {"type": "array", "items": {"type": "object"}}, + ] + }, + "translate": { + "type": "array", + "items": { + "type": "object", + "properties": { + "from": {"type": "string"}, + "to": {"type": "string"}, + }, + }, + }, + }, + "required": ["from"], + } """Schema for Alias plugin configuration. Required configuration keys: @@ -385,25 +424,25 @@ class Alias(BasePlugin): """Register plugin as bus client.""" sends = [] self._to = [] - if 'to' in self.conf: - if isinstance(self.conf['to'], list): - self._to = self.conf['to'] - for to in self.conf['to']: + if "to" in self.conf: + if isinstance(self.conf["to"], list): + self._to = self.conf["to"] + for to in self.conf["to"]: sends.append(MessageTemplate.from_message(to)) else: - self._to = [self.conf['to']] - sends.append(MessageTemplate.from_message(self.conf['to'])) + self._to = [self.conf["to"]] + sends.append(MessageTemplate.from_message(self.conf["to"])) self._translate = {} - if 'translate' in self.conf: - for pair in self.conf['translate']: - self._translate[pair['from']] = pair['to'] - self.bus.register(self.name, 'Alias', - sends, - [([self.conf['from']], self._alias)]) + if "translate" in self.conf: + for pair in self.conf["translate"]: + self._translate[pair["from"]] = pair["to"] + self.bus.register( + self.name, "Alias", sends, [([self.conf["from"]], self._alias)] + ) async def _alias(self, message: Message) -> None: # Prevent endless loop: - if message['sender'] != self.name: + if message["sender"] != self.name: for to in self._to: alias_message = Message(self.name, message) alias_message.update(to) @@ -475,11 +514,13 @@ class Counter(BasePlugin): 'since': ..., 'until': ...} """ - CONF_SCHEMA = {'properties': {'count': {'type': 'object'}, - 'date format': - {'type': 'string', - 'default': '%Y-%m-%d %H:%M:%S'}}, - 'required': ['count']} + CONF_SCHEMA = { + "properties": { + "count": {"type": "object"}, + "date format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"}, + }, + "required": ["count"], + } """Schema for Counter plugin configuration. Required configuration key: @@ -489,39 +530,57 @@ class Counter(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self._since = datetime.now().strftime(self.conf['date format']) + self._since = datetime.now().strftime(self.conf["date format"]) self._counter = 0 - self.bus.register(self.name, 'Counter', - [MessageTemplate({'count': {'type': 'integer'}})], - [([MessageTemplate(self.conf['count'])], - self._count), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get count'}})], - self._get_count), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'reset'}})], - self._reset)]) + self.bus.register( + self.name, + "Counter", + [MessageTemplate({"count": {"type": "integer"}})], + [ + ([MessageTemplate(self.conf["count"])], self._count), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get count"}, + } + ) + ], + self._get_count, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "reset"}, + } + ) + ], + self._reset, + ), + ], + ) async def _count(self, message: Message) -> None: self._counter += 1 async def _get_count(self, message: Message) -> None: - now = datetime.now().strftime(self.conf['date format']) - await self.bus.send(Message(self.name, {'count': self._counter, - 'since': self._since, - 'until': now})) + now = datetime.now().strftime(self.conf["date format"]) + await self.bus.send( + Message( + self.name, {"count": self._counter, "since": self._since, "until": now} + ) + ) async def _reset(self, message: Message) -> None: - now = datetime.now().strftime(self.conf['date format']) + now = datetime.now().strftime(self.conf["date format"]) counter = self._counter self._counter = 0 - await self.bus.send(Message(self.name, {'count': counter, - 'since': self._since, - 'until': now})) + await self.bus.send( + Message(self.name, {"count": counter, "since": self._since, "until": now}) + ) self._since = now async def run(self) -> None: @@ -565,9 +624,9 @@ class Date(BasePlugin): test(): {'sender': 'Test Date', 'date': ...} """ - CONF_SCHEMA = {'properties': {'format': - {'type': 'string', - 'default': '%Y-%m-%d %H:%M:%S'}}} + CONF_SCHEMA = { + "properties": {"format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"}} + } """Schema for Date plugin configuration. Optional configuration key: @@ -578,17 +637,28 @@ class Date(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'Date', - [MessageTemplate({'date': {'type': 'string'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'get date'}})], - self._date)]) + self.bus.register( + self.name, + "Date", + [MessageTemplate({"date": {"type": "string"}})], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "get date"}, + } + ) + ], + self._date, + ) + ], + ) async def _date(self, message: Message) -> None: - date = datetime.now().strftime(self.conf['format']) - await self.bus.send(Message(self.name, {'date': date})) + date = datetime.now().strftime(self.conf["format"]) + await self.bus.send(Message(self.name, {"date": date})) async def run(self) -> None: """Run no code proactively.""" diff --git a/controlpi_plugins/wait.py b/controlpi_plugins/wait.py index 323e171..3ca2475 100644 --- a/controlpi_plugins/wait.py +++ b/controlpi_plugins/wait.py @@ -32,6 +32,7 @@ test(): {'sender': 'Test Wait', 'event': 'finished'} test(): {'sender': 'Test GenericWait', 'event': 'finished', 'id': 'Long Wait'} """ + import asyncio from controlpi import BasePlugin, Message, MessageTemplate @@ -65,8 +66,10 @@ class Wait(BasePlugin): test(): {'sender': 'Long Wait', 'event': 'finished'} """ - CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}}, - 'required': ['seconds']} + CONF_SCHEMA = { + "properties": {"seconds": {"type": "number"}}, + "required": ["seconds"], + } """Schema for Wait plugin configuration. Required configuration key: @@ -77,18 +80,30 @@ class Wait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self._tasks = set() - self.bus.register(self.name, 'Wait', - [MessageTemplate({'event': {'const': 'finished'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'wait'}})], - self._wait)]) + self.bus.register( + self.name, + "Wait", + [MessageTemplate({"event": {"const": "finished"}})], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "wait"}, + } + ) + ], + self._wait, + ) + ], + ) async def _wait(self, message: Message) -> None: async def wait_coroutine(): - await asyncio.sleep(self.conf['seconds']) - await self.bus.send(Message(self.name, {'event': 'finished'})) + await asyncio.sleep(self.conf["seconds"]) + await self.bus.send(Message(self.name, {"event": "finished"})) + # Done in separate task to not block queue awaiting this callback: task = asyncio.create_task(wait_coroutine()) self._tasks.add(task) @@ -140,26 +155,41 @@ class GenericWait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self._tasks = set() - self.bus.register(self.name, 'GenericWait', - [MessageTemplate({'event': {'const': 'finished'}, - 'id': {'type': 'string'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'wait'}, - 'seconds': - {'type': 'number'}, - 'id': - {'type': 'string'}})], - self._wait)]) + self.bus.register( + self.name, + "GenericWait", + [ + MessageTemplate( + {"event": {"const": "finished"}, "id": {"type": "string"}} + ) + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "wait"}, + "seconds": {"type": "number"}, + "id": {"type": "string"}, + } + ) + ], + self._wait, + ) + ], + ) async def _wait(self, message: Message) -> None: async def wait_coroutine(): - assert (isinstance(message['seconds'], float) or - isinstance(message['seconds'], int)) - await asyncio.sleep(message['seconds']) - await self.bus.send(Message(self.name, {'event': 'finished', - 'id': message['id']})) + assert isinstance(message["seconds"], float) or isinstance( + message["seconds"], int + ) + await asyncio.sleep(message["seconds"]) + await self.bus.send( + Message(self.name, {"event": "finished", "id": message["id"]}) + ) + # Done in separate task to not block queue awaiting this callback: task = asyncio.create_task(wait_coroutine()) self._tasks.add(task) @@ -204,8 +234,10 @@ class Timer(BasePlugin): test(): {'sender': 'Timer', 'event': 'finished'} """ - CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}}, - 'required': ['seconds']} + CONF_SCHEMA = { + "properties": {"seconds": {"type": "number"}}, + "required": ["seconds"], + } """Schema for Timer plugin configuration. Required configuration key: @@ -218,34 +250,51 @@ class Timer(BasePlugin): self._tasks = set() self.started = 0 self.cancelled = 0 - self.bus.register(self.name, 'Timer', - [MessageTemplate({'event': - {'const': 'finished'}}), - MessageTemplate({'event': - {'const': 'cancelled'}})], - [([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'start'}})], - self._start), - ([MessageTemplate({'target': - {'const': self.name}, - 'command': - {'const': 'cancel'}})], - self._cancel)]) + self.bus.register( + self.name, + "Timer", + [ + MessageTemplate({"event": {"const": "finished"}}), + MessageTemplate({"event": {"const": "cancelled"}}), + ], + [ + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "start"}, + } + ) + ], + self._start, + ), + ( + [ + MessageTemplate( + { + "target": {"const": self.name}, + "command": {"const": "cancel"}, + } + ) + ], + self._cancel, + ), + ], + ) async def _start(self, message: Message) -> None: self.started += 1 async def wait_coroutine(): - await asyncio.sleep(self.conf['seconds']) + await asyncio.sleep(self.conf["seconds"]) if self.cancelled > 0: self.cancelled -= 1 self.started -= 1 elif self.started > 0: self.started -= 1 - await self.bus.send(Message(self.name, - {'event': 'finished'})) + await self.bus.send(Message(self.name, {"event": "finished"})) + # Done in separate task to not block queue awaiting this callback: task = asyncio.create_task(wait_coroutine()) self._tasks.add(task) @@ -255,8 +304,9 @@ class Timer(BasePlugin): if self.cancelled < self.started: cancel = self.started - self.cancelled self.cancelled = self.started - await self.bus.send(Message(self.name, {'event': 'cancelled', - 'count': cancel})) + await self.bus.send( + Message(self.name, {"event": "cancelled", "count": cancel}) + ) async def run(self) -> None: """Run no code proactively.""" @@ -281,9 +331,10 @@ class Periodic(BasePlugin): test(): {'sender': 'Loop', 'key': 'value'} """ - CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}, - 'message': {'type': 'object'}}, - 'required': ['seconds', 'message']} + CONF_SCHEMA = { + "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}}, + "required": ["seconds", "message"], + } """Schema for Wait plugin configuration. Required configuration key: @@ -294,12 +345,15 @@ class Periodic(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'Periodic', - [MessageTemplate.from_message(self.conf['message'])], - []) + self.bus.register( + self.name, + "Periodic", + [MessageTemplate.from_message(self.conf["message"])], + [], + ) async def run(self) -> None: """Run periodic loop.""" while True: - await asyncio.sleep(self.conf['seconds']) - await self.bus.send(Message(self.name, self.conf['message'])) + await asyncio.sleep(self.conf["seconds"]) + await self.bus.send(Message(self.name, self.conf["message"]))