The test function is a utility function to test plugins with minimal
boilerplate code.
"""
+
import asyncio
-import fastjsonschema # type: ignore
+import fastjsonschema
-from controlpi.messagebus import (MessageBus, BusException,
- Message, MessageTemplate)
+from controlpi.messagebus import MessageBus, Message, MessageTemplate
from controlpi.pluginregistry import PluginRegistry
from controlpi.baseplugin import BasePlugin, PluginConf, ConfException
from typing import Dict, List, Coroutine, Any
-CONF_SCHEMA = {'type': 'object',
- 'patternProperties': {'.*': {'type': 'object'}}}
+CONF_SCHEMA = {"type": "object", "patternProperties": {".*": {"type": "object"}}}
-def _process_conf(message_bus: MessageBus,
- conf: Dict[str, PluginConf]) -> List[Coroutine]:
+def _process_conf(
+ message_bus: MessageBus, conf: Dict[str, PluginConf]
+) -> List[Coroutine]:
try:
conf = fastjsonschema.validate(CONF_SCHEMA, conf)
except fastjsonschema.JsonSchemaException as e:
- print(f"Configuration not valid:\n{e.message}")
+ print(f"Configuration not valid:\n{e}")
return []
- plugins = PluginRegistry('controlpi_plugins', BasePlugin)
+ plugins = PluginRegistry("controlpi_plugins", BasePlugin)
coroutines = [message_bus.run()]
for instance_name in conf:
instance_conf = conf[instance_name]
- if 'plugin' not in instance_conf:
- print("No plugin implementation specified for instance"
- f" '{instance_name}'.")
+ if "plugin" not in instance_conf:
+ print(f"No plugin implementation specified for instance '{instance_name}'.")
continue
- plugin_name = instance_conf['plugin']
+ plugin_name = instance_conf["plugin"]
if plugin_name not in plugins:
- print(f"No implementation found for plugin '{plugin_name}'"
- f" (specified for instance '{instance_name}').")
+ print(
+ f"No implementation found for plugin '{plugin_name}'"
+ f" (specified for instance '{instance_name}')."
+ )
continue
plugin = plugins[plugin_name]
try:
pass
-async def test(conf: Dict[str, PluginConf],
- messages: List[Dict[str, Any]],
- wait: float = 0.0) -> None:
+async def test(
+ conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0
+) -> None:
"""Test configuration of ControlPi system.
Setup message bus, process given configuration, run message bus and
message_bus = MessageBus()
async def log(message):
- if ('sender' in message and message['sender'] == '' and
- 'event' in message and message['event'] == 'registered' and
- 'client' in message and message['client'] == 'test()'):
+ if (
+ "sender" in message
+ and message["sender"] == ""
+ and "event" in message
+ and message["event"] == "registered"
+ and "client" in message
+ and message["client"] == "test()"
+ ):
# Do not log own registration of 'test()':
return
print(f"test(): {message}")
- message_bus.register('test()', 'Test',
- [MessageTemplate()], [([MessageTemplate()], log)])
+
+ message_bus.register(
+ "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)]
+ )
coroutines = _process_conf(message_bus, conf)
background_tasks = set()
# Give the created task opportunity to run:
await asyncio.sleep(0)
for message in messages:
- await message_bus.send(Message('test()', message))
+ await message_bus.send(Message("test()", message))
# Give immediate reactions to messages opportunity to happen:
await asyncio.sleep(0)
await asyncio.sleep(wait)
started by:
python -m controlpi <path to conf.json>
"""
+
import signal
import sys
import os
import json
import asyncio
-import pyinotify # type: ignore
+import pyinotify
from controlpi import run, PluginConf
"""Add signal handlers to the running loop."""
loop = asyncio.get_running_loop()
for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
- loop.add_signal_handler(sig,
- lambda s=sig: asyncio.create_task(shutdown(s)))
+ loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s)))
def read_configuration() -> Dict[str, PluginConf]:
"""Add handler for configuration file."""
wm = pyinotify.WatchManager()
loop = asyncio.get_running_loop()
- notifier = pyinotify.AsyncioNotifier(wm, loop,
- default_proc_fun=ConfigHandler())
+ notifier = pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=ConfigHandler())
wm.add_watch(os.path.dirname(sys.argv[1]), pyinotify.ALL_EVENTS)
return notifier
await run(conf)
notifier.stop()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
asyncio.run(main())
Plugins are configured and run based on the information in the global
configuration. Here, we test this manually:
+>>> import asyncio
>>> async def test():
... p = TestPlugin(MessageBus(), 'Test Instance', {'key': 'Something'})
... await p.run()
network socket plugin might register separate clients for all connections
to the socket (and unregister them when the connection is closed).
"""
-__pdoc__ = {'BasePlugin.CONF_SCHEMA': False}
+
+__pdoc__ = {"BasePlugin.CONF_SCHEMA": False}
from abc import ABC, abstractmethod
-import asyncio
-import fastjsonschema # type: ignore
+import fastjsonschema
from controlpi.messagebus import MessageBus
from typing import Union, Dict, List, Any, Optional, Callable
-JSONSchema = Union[bool, Dict[str, Union[None, str, int, float, bool,
- Dict[str, Any], List[Any]]]]
+
+JSONSchema = Union[
+ bool, Dict[str, Union[None, str, int, float, bool, Dict[str, Any], List[Any]]]
+]
# Could be more specific.
PluginConf = Dict[str, Any]
# Could be more specific.
Initialisation sets the instance variables bus to the given message bus,
name to the given name, and conf to the given configuration:
+ >>> import asyncio
>>> class TestPlugin(BasePlugin):
... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
... 'required': ['key']}
self.bus = bus
self.name = name
if not type(self)._validate:
- type(self)._validate = \
- fastjsonschema.compile(type(self).CONF_SCHEMA)
+ type(self)._validate = fastjsonschema.compile(type(self).CONF_SCHEMA)
self.conf = {}
validate = type(self)._validate
assert validate is not None
try:
self.conf = validate(conf)
except fastjsonschema.JsonSchemaException as e:
- print(e.message)
- raise ConfException(f"Configuration for '{self.name}'"
- " is not valid.")
+ print(e)
+ raise ConfException(f"Configuration for '{self.name}' is not valid.")
self.process_conf()
@abstractmethod
Logger: {'sender': '', 'target': 'Client 1'}
Client 1: {'sender': '', 'target': 'Client 1'}
"""
+
import asyncio
import json
-import fastjsonschema # type: ignore
+import fastjsonschema
import sys
-from typing import (Union, Dict, List, Any, Callable, Coroutine,
- Optional, Iterable, Tuple)
+from typing import (
+ Union,
+ Dict,
+ List,
+ Any,
+ Callable,
+ Coroutine,
+ Optional,
+ Iterable,
+ Tuple,
+)
+
MessageValue = Union[None, str, int, float, bool, Dict[str, Any], List[Any]]
# Should really be:
# MessageValue = Union[None, str, int, float, bool,
# https://github.com/python/mypy/issues/731
JSONSchema = Union[bool, Dict[str, MessageValue]]
# Could be even more specific.
-MessageCallback = Callable[['Message'], Coroutine[Any, Any, None]]
+MessageCallback = Callable[["Message"], Coroutine[Any, Any, None]]
# Global cache of JSON schema validation functions:
{'sender': 'New sender', 'key': 'value'}
"""
- def __init__(self, sender: str,
- init: Optional[Dict[str, MessageValue]] = None) -> None:
+ def __init__(
+ self, sender: str, init: Optional[Dict[str, MessageValue]] = None
+ ) -> None:
"""Initialise message.
Message is initialised with given sender and possibly given
{'sender': 'Example sender', 'key 1': 'value 1'}
"""
if not isinstance(sender, str):
- raise TypeError(f"'{sender}' is not a valid sender name"
- " (not a string).")
- self['sender'] = ''
+ raise TypeError(f"'{sender}' is not a valid sender name (not a string).")
+ self["sender"] = ""
if init is not None:
self.update(init)
- self['sender'] = sender
+ self["sender"] = sender
@staticmethod
def check_value(value: MessageValue) -> bool:
"""
if value is None:
return True
- elif (isinstance(value, str) or isinstance(value, int) or
- isinstance(value, float) or isinstance(value, bool)):
+ elif (
+ isinstance(value, str)
+ or isinstance(value, int)
+ or isinstance(value, float)
+ or isinstance(value, bool)
+ ):
return True
elif isinstance(value, dict):
for key in value:
TypeError: '1j' is not a valid value in Message.
"""
if not isinstance(key, str):
- raise TypeError(f"'{key}' is not a valid key in Message"
- " (not a string).")
+ raise TypeError(f"'{key}' is not a valid key in Message (not a string).")
if not self.check_value(value):
raise TypeError(f"'{value}' is not a valid value in Message.")
super().__setitem__(key, value)
"""
if args:
if len(args) > 1:
- raise TypeError("update expected at most 1 argument,"
- f" got {len(args)}")
+ raise TypeError(f"update expected at most 1 argument, got {len(args)}")
other = dict(args[0])
for key in other:
self[key] = other[key]
self.update(init)
@staticmethod
- def from_message(message: Message) -> 'MessageTemplate':
+ def from_message(message: Message) -> "MessageTemplate":
"""Create template from message.
Template witch constant schemas is created from message:
and the template for the registration can be constructed by this
method.
"""
+
def schema_from_value(value: MessageValue) -> JSONSchema:
schema: JSONSchema = False
if value is None:
- schema = {'const': None}
- elif (isinstance(value, str) or isinstance(value, int) or
- isinstance(value, float) or isinstance(value, bool)):
- schema = {'const': value}
+ schema = {"const": None}
+ elif (
+ isinstance(value, str)
+ or isinstance(value, int)
+ or isinstance(value, float)
+ or isinstance(value, bool)
+ ):
+ schema = {"const": value}
elif isinstance(value, dict):
properties = {}
for inner_key in value:
inner_value: Message = value[inner_key]
properties[inner_key] = schema_from_value(inner_value)
- schema = {'type': 'object',
- 'properties': properties}
+ schema = {"type": "object", "properties": properties}
elif isinstance(value, list):
- schema = {'type': 'array',
- 'items': [schema_from_value(element)
- for element in value]}
+ schema = {
+ "type": "array",
+ "items": [schema_from_value(element) for element in value],
+ }
return schema
+
template = MessageTemplate()
for key in message:
template[key] = schema_from_value(message[key])
>>> t['key'] = True
"""
if not isinstance(key, str):
- raise TypeError(f"'{key}' is not a valid key in MessageTemplate"
- " (not a string).")
+ raise TypeError(
+ f"'{key}' is not a valid key in MessageTemplate (not a string)."
+ )
if not register_schema(value):
- raise TypeError(f"'{value}' is not a valid value in"
- " MessageTemplate (not a valid JSON schema).")
+ raise TypeError(
+ f"'{value}' is not a valid value in"
+ " MessageTemplate (not a valid JSON schema)."
+ )
super().__setitem__(key, value)
def update(self, *args, **kwargs) -> None:
"""
if args:
if len(args) > 1:
- raise TypeError("update expected at most 1 argument,"
- f" got {len(args)}")
+ raise TypeError(f"update expected at most 1 argument, got {len(args)}")
other = dict(args[0])
for key in other:
self[key] = other[key]
for key in kwargs:
self[key] = kwargs[key]
- def setdefault(self, key: str,
- value: Optional[JSONSchema] = None) -> JSONSchema:
+ def setdefault(self, key: str, value: Optional[JSONSchema] = None) -> JSONSchema:
"""Override setdefault to use validity checks.
>>> t = MessageTemplate()
# First key is the message key, second key is the JSON schema string
self._templates: Dict[str, List[MessageTemplate]] = {}
- def insert(self, template: MessageTemplate, client: str,
- callback: Optional[MessageCallback] = None) -> None:
+ def insert(
+ self,
+ template: MessageTemplate,
+ client: str,
+ callback: Optional[MessageCallback] = None,
+ ) -> None:
"""Register a client for a template.
>>> r = TemplateRegistry()
self._callbacks[client].append(callback)
else:
key, schema = next(iter(template.items()))
- reduced_template = MessageTemplate({k: template[k]
- for k in template
- if k != key})
- if (isinstance(schema, dict) and len(schema) == 1 and
- 'const' in schema and isinstance(schema['const'], str)):
- value = schema['const']
+ reduced_template = MessageTemplate(
+ {k: template[k] for k in template if k != key}
+ )
+ if (
+ isinstance(schema, dict)
+ and len(schema) == 1
+ and "const" in schema
+ and isinstance(schema["const"], str)
+ ):
+ value = schema["const"]
if key not in self._constants:
self._constants[key] = {}
if value not in self._constants[key]:
self._constants[key][value] = TemplateRegistry()
- self._constants[key][value].insert(reduced_template,
- client, callback)
+ self._constants[key][value].insert(reduced_template, client, callback)
else:
schema_string = json.dumps(schema)
if key not in self._schemas:
self._schemas[key] = {}
if schema_string not in self._schemas[key]:
self._schemas[key][schema_string] = TemplateRegistry()
- self._schemas[key][schema_string].insert(reduced_template,
- client, callback)
+ self._schemas[key][schema_string].insert(
+ reduced_template, client, callback
+ )
def delete(self, client: str) -> bool:
"""Unregister a client from all templates.
if not new_schemas[key]:
del new_schemas[key]
self._schemas = new_schemas
- if (self._clients or self._callbacks or
- self._constants or self._schemas):
+ if self._clients or self._callbacks or self._constants or self._schemas:
return True
return False
if client in self._clients:
return True
for key in self._constants:
- if (key in message and isinstance(message[key], str) and
- message[key] in self._constants[key]):
+ if (
+ key in message
+ and isinstance(message[key], str)
+ and message[key] in self._constants[key]
+ ):
value = message[key]
assert isinstance(value, str)
child = self._constants[key][value]
if client not in result:
result.append(client)
for key in self._constants:
- if (key in message and isinstance(message[key], str) and
- message[key] in self._constants[key]):
+ if (
+ key in message
+ and isinstance(message[key], str)
+ and message[key] in self._constants[key]
+ ):
value = message[key]
assert isinstance(value, str)
child = self._constants[key][value]
if callback not in result:
result.append(callback)
for key in self._constants:
- if (key in message and isinstance(message[key], str) and
- message[key] in self._constants[key]):
+ if (
+ key in message
+ and isinstance(message[key], str)
+ and message[key] in self._constants[key]
+ ):
value = message[key]
assert isinstance(value, str)
child = self._constants[key][value]
self._send_reg: TemplateRegistry = TemplateRegistry()
self._recv_reg: TemplateRegistry = TemplateRegistry()
- def register(self, client: str, plugin: str,
- sends: Iterable[MessageTemplate],
- receives: Iterable[Tuple[Iterable[MessageTemplate],
- MessageCallback]]) -> None:
+ def register(
+ self,
+ client: str,
+ plugin: str,
+ sends: Iterable[MessageTemplate],
+ receives: Iterable[Tuple[Iterable[MessageTemplate], MessageCallback]],
+ ) -> None:
"""Register a client at the message bus.
>>> async def callback(message):
if not client:
raise BusException("Client name is not allowed to be empty.")
if client in self._plugins:
- raise BusException(f"Client '{client}' already registered"
- " at message bus.")
- event = Message('')
- event['event'] = 'registered'
- event['client'] = client
+ raise BusException(f"Client '{client}' already registered at message bus.")
+ event = Message("")
+ event["event"] = "registered"
+ event["client"] = client
self._plugins[client] = plugin
- event['plugin'] = plugin
+ event["plugin"] = plugin
for template in sends:
self._send_reg.insert(template, client)
- event['sends'] = self._send_reg.get_templates(client)
- for (templates, callback) in receives:
+ event["sends"] = self._send_reg.get_templates(client)
+ for templates, callback in receives:
for template in templates:
self._recv_reg.insert(template, client, callback)
- event['receives'] = self._recv_reg.get_templates(client)
+ event["receives"] = self._recv_reg.get_templates(client)
self._queue.put_nowait(event)
def unregister(self, client: str) -> None:
"""
if client not in self._plugins:
return
- event = Message('')
- event['event'] = 'unregistered'
- event['client'] = client
+ event = Message("")
+ event["event"] = "unregistered"
+ event["client"] = client
del self._plugins[client]
self._send_reg.delete(client)
self._recv_reg.delete(client)
background_tasks = set()
while True:
message = await self._queue.get()
- if ('target' in message and
- message['target'] == '' and
- 'command' in message):
- if message['command'] == 'get clients':
+ if "target" in message and message["target"] == "" and "command" in message:
+ if message["command"] == "get clients":
for client in self._plugins:
- answer = Message('')
- answer['client'] = client
- answer['plugin'] = self._plugins[client]
- answer['sends'] = (self._send_reg
- .get_templates(client))
- answer['receives'] = (self._recv_reg
- .get_templates(client))
+ answer = Message("")
+ answer["client"] = client
+ answer["plugin"] = self._plugins[client]
+ answer["sends"] = self._send_reg.get_templates(client)
+ answer["receives"] = self._recv_reg.get_templates(client)
await self._queue.put(answer)
- elif message['command'] == 'push conf':
+ elif message["command"] == "push conf":
conf = {}
try:
with open(sys.argv[1]) as conf_file:
conf = json.load(conf_file)
- except (IndexError, FileNotFoundError,
- json.decoder.JSONDecodeError):
+ except (
+ IndexError,
+ FileNotFoundError,
+ json.decoder.JSONDecodeError,
+ ):
pass
- if conf == message['conf']:
- await (self._queue
- .put(Message('',
- {'event': 'conf unchanged'})))
+ if conf == message["conf"]:
+ await self._queue.put(Message("", {"event": "conf unchanged"}))
else:
- await (self._queue
- .put(Message('',
- {'event': 'conf changed'})))
- with open(sys.argv[1], 'w') as conf_file:
- json.dump(message['conf'], conf_file)
+ await self._queue.put(Message("", {"event": "conf changed"}))
+ with open(sys.argv[1], "w") as conf_file:
+ json.dump(message["conf"], conf_file)
for callback in self._recv_reg.get_callbacks(message):
task = asyncio.create_task(callback(message))
background_tasks.add(task)
Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
Got: {'sender': 'Client 2', 'target': 'Client 1'}
"""
- assert isinstance(message['sender'], str)
- sender = message['sender']
+ assert isinstance(message["sender"], str)
+ sender = message["sender"]
if sender:
if not self._send_reg.check(sender, message):
- raise BusException(f"Message '{message}' not allowed for"
- f" sender '{sender}'.")
+ raise BusException(
+ f"Message '{message}' not allowed for sender '{sender}'."
+ )
await self._queue.put(message)
def send_nowait(self, message: Message) -> None:
Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
Got: {'sender': 'Client 2', 'target': 'Client 1'}
"""
- assert isinstance(message['sender'], str)
- sender = message['sender']
+ assert isinstance(message["sender"], str)
+ sender = message["sender"]
if sender:
if not self._send_reg.check(sender, message):
- raise BusException(f"Message '{message}' not allowed for"
- f" sender '{sender}'.")
+ raise BusException(
+ f"Message '{message}' not allowed for sender '{sender}'."
+ )
self._queue.put_nowait(message)
>>> p1 = registry['Plugin1']
>>> i1 = p1()
"""
+
import importlib
import pkgutil
import collections.abc
Plugin2: <class 'pluginregistry.Plugin2'>
"""
ns_mod = importlib.import_module(namespace_package)
- ns_path = ns_mod.__path__ # type: ignore # mypy issue #1422
+ ns_path = ns_mod.__path__
ns_name = ns_mod.__name__
for _, mod_name, _ in pkgutil.iter_modules(ns_path):
importlib.import_module(f"{ns_name}.{mod_name}")
result.append(subcls)
result.extend(all_subclasses(subcls))
return result
- self._plugins = {cls.__name__: cls
- for cls in all_subclasses(base_class)}
+
+ self._plugins = {cls.__name__: cls for cls in all_subclasses(base_class)}
def __len__(self) -> int:
"""Get number of registered plugins.
test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True}
test(): {'sender': 'Test State 4', 'event': 'changed', 'state': True}
"""
+
from controlpi import BasePlugin, Message, MessageTemplate
from typing import Dict, List
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.state: bool = False
- self.bus.register(self.name, 'State',
- [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'set state'},
- 'new state':
- {'type': 'boolean'}})],
- self._set_state)])
+ self.bus.register(
+ self.name,
+ "State",
+ [
+ MessageTemplate(
+ {"event": {"const": "changed"}, "state": {"type": "boolean"}}
+ ),
+ MessageTemplate({"state": {"type": "boolean"}}),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "set state"},
+ "new state": {"type": "boolean"},
+ }
+ )
+ ],
+ self._set_state,
+ ),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'state': self.state}))
+ await self.bus.send(Message(self.name, {"state": self.state}))
async def _set_state(self, message: Message) -> None:
- if self.state != message['new state']:
- assert isinstance(message['new state'], bool)
- self.state = message['new state']
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
+ if self.state != message["new state"]:
+ assert isinstance(message["new state"], bool)
+ self.state = message["new state"]
+ await self.bus.send(
+ Message(self.name, {"event": "changed", "state": self.state})
+ )
else:
- await self.bus.send(Message(self.name,
- {'state': self.state}))
+ await self.bus.send(Message(self.name, {"state": self.state}))
async def run(self) -> None:
"""Run no code proactively."""
test(): {'sender': 'Test StateAlias', 'state': True}
"""
- CONF_SCHEMA = {'properties': {'alias for': {'type': 'string'}},
- 'required': ['alias for']}
+ CONF_SCHEMA = {
+ "properties": {"alias for": {"type": "string"}},
+ "required": ["alias for"],
+ }
"""Schema for StateAlias plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'StateAlias',
- [MessageTemplate({'target':
- {'const': self.conf['alias for']},
- 'command':
- {'const': 'get state'}}),
- MessageTemplate({'target':
- {'const': self.conf['alias for']},
- 'command':
- {'const': 'set state'},
- 'new state':
- {'type': 'boolean'}}),
- MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'set state'},
- 'new state':
- {'type': 'boolean'}})],
- self._set_state),
- ([MessageTemplate({'sender':
- {'const':
- self.conf['alias for']},
- 'state':
- {'type': 'boolean'}})],
- self._translate)])
+ self.bus.register(
+ self.name,
+ "StateAlias",
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.conf["alias for"]},
+ "command": {"const": "get state"},
+ }
+ ),
+ MessageTemplate(
+ {
+ "target": {"const": self.conf["alias for"]},
+ "command": {"const": "set state"},
+ "new state": {"type": "boolean"},
+ }
+ ),
+ MessageTemplate(
+ {"event": {"const": "changed"}, "state": {"type": "boolean"}}
+ ),
+ MessageTemplate({"state": {"type": "boolean"}}),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "set state"},
+ "new state": {"type": "boolean"},
+ }
+ )
+ ],
+ self._set_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "sender": {"const": self.conf["alias for"]},
+ "state": {"type": "boolean"},
+ }
+ )
+ ],
+ self._translate,
+ ),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name,
- {'target': self.conf['alias for'],
- 'command': 'get state'}))
+ await self.bus.send(
+ Message(
+ self.name, {"target": self.conf["alias for"], "command": "get state"}
+ )
+ )
async def _set_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name,
- {'target': self.conf['alias for'],
- 'command': 'set state',
- 'new state': message['new state']}))
+ await self.bus.send(
+ Message(
+ self.name,
+ {
+ "target": self.conf["alias for"],
+ "command": "set state",
+ "new state": message["new state"],
+ },
+ )
+ )
async def _translate(self, message: Message) -> None:
alias_message = Message(self.name)
- if 'event' in message and message['event'] == 'changed':
- alias_message['event'] = 'changed'
- alias_message['state'] = message['state']
+ if "event" in message and message["event"] == "changed":
+ alias_message["event"] = "changed"
+ alias_message["state"] = message["state"]
await self.bus.send(alias_message)
async def run(self) -> None:
'states': ['Test State 1', 'Test State 2']}
"""
- CONF_SCHEMA = {'properties': {'states': {'type': 'array',
- 'items': {'type': 'string'}}},
- 'required': ['states']}
+ CONF_SCHEMA = {
+ "properties": {"states": {"type": "array", "items": {"type": "string"}}},
+ "required": ["states"],
+ }
"""Schema for AndState plugin configuration.
Required configuration key:
"""Register plugin as bus client."""
updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
- for state in self.conf['states']:
- updates.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ for state in self.conf["states"]:
+ updates.append(
+ MessageTemplate(
+ {"sender": {"const": state}, "state": {"type": "boolean"}}
+ )
+ )
self.states[state] = False
self.state: bool = all(self.states.values())
- self.bus.register(self.name, 'AndState',
- [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}}),
- MessageTemplate({'states': {'type': 'array',
- 'items': {
- 'type': 'string'
- }}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get sources'}})],
- self._get_sources),
- (updates, self._update)])
+ self.bus.register(
+ self.name,
+ "AndState",
+ [
+ MessageTemplate(
+ {"event": {"const": "changed"}, "state": {"type": "boolean"}}
+ ),
+ MessageTemplate({"state": {"type": "boolean"}}),
+ MessageTemplate(
+ {"states": {"type": "array", "items": {"type": "string"}}}
+ ),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get sources"},
+ }
+ )
+ ],
+ self._get_sources,
+ ),
+ (updates, self._update),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'state': self.state}))
+ await self.bus.send(Message(self.name, {"state": self.state}))
async def _get_sources(self, message: Message) -> None:
source_states = list(self.states.keys())
- await self.bus.send(Message(self.name, {'states': source_states}))
+ await self.bus.send(Message(self.name, {"states": source_states}))
async def _update(self, message: Message) -> None:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
+ assert isinstance(message["sender"], str)
+ assert isinstance(message["state"], bool)
+ self.states[message["sender"]] = message["state"]
new_state = all(self.states.values())
if self.state != new_state:
self.state = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
+ await self.bus.send(
+ Message(self.name, {"event": "changed", "state": self.state})
+ )
async def run(self) -> None:
"""Run no code proactively."""
'states': ['Test State 1', 'Test State 2']}
"""
- CONF_SCHEMA = {'properties': {'states': {'type': 'array',
- 'items': {'type': 'string'}}},
- 'required': ['states']}
+ CONF_SCHEMA = {
+ "properties": {"states": {"type": "array", "items": {"type": "string"}}},
+ "required": ["states"],
+ }
"""Schema for OrState plugin configuration.
Required configuration key:
"""Register plugin as bus client."""
updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
- for state in self.conf['states']:
- updates.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ for state in self.conf["states"]:
+ updates.append(
+ MessageTemplate(
+ {"sender": {"const": state}, "state": {"type": "boolean"}}
+ )
+ )
self.states[state] = False
self.state: bool = any(self.states.values())
- self.bus.register(self.name, 'OrState',
- [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}}),
- MessageTemplate({'states': {'type': 'array',
- 'items': {
- 'type': 'string'
- }}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get sources'}})],
- self._get_sources),
- (updates, self._update)])
+ self.bus.register(
+ self.name,
+ "OrState",
+ [
+ MessageTemplate(
+ {"event": {"const": "changed"}, "state": {"type": "boolean"}}
+ ),
+ MessageTemplate({"state": {"type": "boolean"}}),
+ MessageTemplate(
+ {"states": {"type": "array", "items": {"type": "string"}}}
+ ),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get sources"},
+ }
+ )
+ ],
+ self._get_sources,
+ ),
+ (updates, self._update),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'state': self.state}))
+ await self.bus.send(Message(self.name, {"state": self.state}))
async def _get_sources(self, message: Message) -> None:
source_states = list(self.states.keys())
- await self.bus.send(Message(self.name, {'states': source_states}))
+ await self.bus.send(Message(self.name, {"states": source_states}))
async def _update(self, message: Message) -> None:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
+ assert isinstance(message["sender"], str)
+ assert isinstance(message["state"], bool)
+ self.states[message["sender"]] = message["state"]
new_state = any(self.states.values())
if self.state != new_state:
self.state = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
+ await self.bus.send(
+ Message(self.name, {"event": "changed", "state": self.state})
+ )
async def run(self) -> None:
"""Run no code proactively."""
test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False}
"""
- CONF_SCHEMA = {'properties': {'input states': {'type': 'array',
- 'items': {'type':
- 'string'}},
- 'output state': {'type': 'string'}},
- 'required': ['input states', 'output state']}
+ CONF_SCHEMA = {
+ "properties": {
+ "input states": {"type": "array", "items": {"type": "string"}},
+ "output state": {"type": "string"},
+ },
+ "required": ["input states", "output state"],
+ }
"""Schema for AndSet plugin configuration.
Required configuration keys:
"""Register plugin as bus client."""
updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
- for state in self.conf['input states']:
- updates.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ for state in self.conf["input states"]:
+ updates.append(
+ MessageTemplate(
+ {"sender": {"const": state}, "state": {"type": "boolean"}}
+ )
+ )
self.states[state] = False
self.state: bool = all(self.states.values())
- self.bus.register(self.name, 'AndSet',
- [MessageTemplate({'target':
- {'const':
- self.conf['output state']},
- 'command':
- {'const': 'set state'},
- 'new state':
- {'type': 'boolean'}}),
- MessageTemplate({'states': {'type': 'array',
- 'items': {
- 'type': 'string'
- }}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get sources'}})],
- self._get_sources),
- (updates, self._update)])
+ self.bus.register(
+ self.name,
+ "AndSet",
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.conf["output state"]},
+ "command": {"const": "set state"},
+ "new state": {"type": "boolean"},
+ }
+ ),
+ MessageTemplate(
+ {"states": {"type": "array", "items": {"type": "string"}}}
+ ),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get sources"},
+ }
+ )
+ ],
+ self._get_sources,
+ ),
+ (updates, self._update),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name,
- {'target': self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
+ await self.bus.send(
+ Message(
+ self.name,
+ {
+ "target": self.conf["output state"],
+ "command": "set state",
+ "new state": self.state,
+ },
+ )
+ )
async def _get_sources(self, message: Message) -> None:
source_states = list(self.states.keys())
- await self.bus.send(Message(self.name, {'states': source_states}))
+ await self.bus.send(Message(self.name, {"states": source_states}))
async def _update(self, message: Message) -> None:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
+ assert isinstance(message["sender"], str)
+ assert isinstance(message["state"], bool)
+ self.states[message["sender"]] = message["state"]
new_state = all(self.states.values())
if self.state != new_state:
self.state = new_state
- await self.bus.send(Message(self.name,
- {'target':
- self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
+ await self.bus.send(
+ Message(
+ self.name,
+ {
+ "target": self.conf["output state"],
+ "command": "set state",
+ "new state": self.state,
+ },
+ )
+ )
async def run(self) -> None:
"""Run no code proactively."""
'states': ['Test State 1', 'Test State 2']}
"""
- CONF_SCHEMA = {'properties': {'input states': {'type': 'array',
- 'items': {'type':
- 'string'}},
- 'output state': {'type': 'string'}},
- 'required': ['input states', 'output state']}
+ CONF_SCHEMA = {
+ "properties": {
+ "input states": {"type": "array", "items": {"type": "string"}},
+ "output state": {"type": "string"},
+ },
+ "required": ["input states", "output state"],
+ }
"""Schema for OrSet plugin configuration.
Required configuration keys:
"""Register plugin as bus client."""
updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
- for state in self.conf['input states']:
- updates.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ for state in self.conf["input states"]:
+ updates.append(
+ MessageTemplate(
+ {"sender": {"const": state}, "state": {"type": "boolean"}}
+ )
+ )
self.states[state] = False
self.state: bool = any(self.states.values())
- self.bus.register(self.name, 'AndSet',
- [MessageTemplate({'target':
- {'const':
- self.conf['output state']},
- 'command':
- {'const': 'set state'},
- 'new state':
- {'type': 'boolean'}}),
- MessageTemplate({'states': {'type': 'array',
- 'items': {
- 'type': 'string'
- }}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get state'}})],
- self._get_state),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get sources'}})],
- self._get_sources),
- (updates, self._update)])
+ self.bus.register(
+ self.name,
+ "AndSet",
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.conf["output state"]},
+ "command": {"const": "set state"},
+ "new state": {"type": "boolean"},
+ }
+ ),
+ MessageTemplate(
+ {"states": {"type": "array", "items": {"type": "string"}}}
+ ),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get state"},
+ }
+ )
+ ],
+ self._get_state,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get sources"},
+ }
+ )
+ ],
+ self._get_sources,
+ ),
+ (updates, self._update),
+ ],
+ )
async def _get_state(self, message: Message) -> None:
- await self.bus.send(Message(self.name,
- {'target': self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
+ await self.bus.send(
+ Message(
+ self.name,
+ {
+ "target": self.conf["output state"],
+ "command": "set state",
+ "new state": self.state,
+ },
+ )
+ )
async def _get_sources(self, message: Message) -> None:
source_states = list(self.states.keys())
- await self.bus.send(Message(self.name, {'states': source_states}))
+ await self.bus.send(Message(self.name, {"states": source_states}))
async def _update(self, message: Message) -> None:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
+ assert isinstance(message["sender"], str)
+ assert isinstance(message["state"], bool)
+ self.states[message["sender"]] = message["state"]
new_state = any(self.states.values())
if self.state != new_state:
self.state = new_state
- await self.bus.send(Message(self.name,
- {'target':
- self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
+ await self.bus.send(
+ Message(
+ self.name,
+ {
+ "target": self.conf["output state"],
+ "command": "set state",
+ "new state": self.state,
+ },
+ )
+ )
async def run(self) -> None:
"""Run no code proactively."""
Test Log: {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
"""
+
import asyncio
from datetime import datetime
Configuration for 'Test Log' is not valid.
"""
- CONF_SCHEMA = {'properties': {'filter': {'type': 'array',
- 'items': {'type': 'object'}}},
- 'required': ['filter']}
+ CONF_SCHEMA = {
+ "properties": {"filter": {"type": "array", "items": {"type": "object"}}},
+ "required": ["filter"],
+ }
"""Schema for Log plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'Log',
- [],
- [(self.conf['filter'], self._log)])
+ self.bus.register(self.name, "Log", [], [(self.conf["filter"], self._log)])
async def _log(self, message: Message) -> None:
print(f"{self.name}: {message}")
Configuration for 'Test Init' is not valid.
"""
- CONF_SCHEMA = {'properties': {'messages': {'type': 'array',
- 'items': {'type': 'object'}}},
- 'required': ['messages']}
+ CONF_SCHEMA = {
+ "properties": {"messages": {"type": "array", "items": {"type": "object"}}},
+ "required": ["messages"],
+ }
"""Schema for Init plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'Init',
- [MessageTemplate.from_message(message)
- for message in self.conf['messages']],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'execute'}})],
- self._execute)])
+ self.bus.register(
+ self.name,
+ "Init",
+ [
+ MessageTemplate.from_message(message)
+ for message in self.conf["messages"]
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "execute"},
+ }
+ )
+ ],
+ self._execute,
+ )
+ ],
+ )
async def _execute(self, message: Message) -> None:
- for message in self.conf['messages']:
+ for message in self.conf["messages"]:
await self.bus.send(Message(self.name, message))
# Give immediate reactions to messages opportunity to happen:
await asyncio.sleep(0)
async def run(self) -> None:
"""Send configured messages on startup."""
- for message in self.conf['messages']:
+ for message in self.conf["messages"]:
await self.bus.send(Message(self.name, message))
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.messages: List[Message] = []
- self.bus.register(self.name, 'Execute',
- [MessageTemplate()],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'set messages'},
- 'messages':
- {'type': 'array',
- 'items':
- {'type': 'object'}}})],
- self._set_messages),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'execute'}})],
- self._execute)])
+ self.bus.register(
+ self.name,
+ "Execute",
+ [MessageTemplate()],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "set messages"},
+ "messages": {
+ "type": "array",
+ "items": {"type": "object"},
+ },
+ }
+ )
+ ],
+ self._set_messages,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "execute"},
+ }
+ )
+ ],
+ self._execute,
+ ),
+ ],
+ )
async def _set_messages(self, message: Message) -> None:
- assert isinstance(message['messages'], list)
- self.messages = list(message['messages'])
+ assert isinstance(message["messages"], list)
+ self.messages = list(message["messages"])
async def _execute(self, message: Message) -> None:
for message in self.messages:
Configuration for 'Test Alias' is not valid.
"""
- CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
- 'to': {'anyOf':
- [{'type': 'object'},
- {'type': 'array',
- 'items': {'type': 'object'}}]},
- 'translate': {'type': 'array',
- 'items':
- {'type': 'object',
- 'properties':
- {'from':
- {'type': 'string'},
- 'to':
- {'type': 'string'}}}}},
- 'required': ['from']}
+ CONF_SCHEMA = {
+ "properties": {
+ "from": {"type": "object"},
+ "to": {
+ "anyOf": [
+ {"type": "object"},
+ {"type": "array", "items": {"type": "object"}},
+ ]
+ },
+ "translate": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "from": {"type": "string"},
+ "to": {"type": "string"},
+ },
+ },
+ },
+ },
+ "required": ["from"],
+ }
"""Schema for Alias plugin configuration.
Required configuration keys:
"""Register plugin as bus client."""
sends = []
self._to = []
- if 'to' in self.conf:
- if isinstance(self.conf['to'], list):
- self._to = self.conf['to']
- for to in self.conf['to']:
+ if "to" in self.conf:
+ if isinstance(self.conf["to"], list):
+ self._to = self.conf["to"]
+ for to in self.conf["to"]:
sends.append(MessageTemplate.from_message(to))
else:
- self._to = [self.conf['to']]
- sends.append(MessageTemplate.from_message(self.conf['to']))
+ self._to = [self.conf["to"]]
+ sends.append(MessageTemplate.from_message(self.conf["to"]))
self._translate = {}
- if 'translate' in self.conf:
- for pair in self.conf['translate']:
- self._translate[pair['from']] = pair['to']
- self.bus.register(self.name, 'Alias',
- sends,
- [([self.conf['from']], self._alias)])
+ if "translate" in self.conf:
+ for pair in self.conf["translate"]:
+ self._translate[pair["from"]] = pair["to"]
+ self.bus.register(
+ self.name, "Alias", sends, [([self.conf["from"]], self._alias)]
+ )
async def _alias(self, message: Message) -> None:
# Prevent endless loop:
- if message['sender'] != self.name:
+ if message["sender"] != self.name:
for to in self._to:
alias_message = Message(self.name, message)
alias_message.update(to)
'since': ..., 'until': ...}
"""
- CONF_SCHEMA = {'properties': {'count': {'type': 'object'},
- 'date format':
- {'type': 'string',
- 'default': '%Y-%m-%d %H:%M:%S'}},
- 'required': ['count']}
+ CONF_SCHEMA = {
+ "properties": {
+ "count": {"type": "object"},
+ "date format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"},
+ },
+ "required": ["count"],
+ }
"""Schema for Counter plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self._since = datetime.now().strftime(self.conf['date format'])
+ self._since = datetime.now().strftime(self.conf["date format"])
self._counter = 0
- self.bus.register(self.name, 'Counter',
- [MessageTemplate({'count': {'type': 'integer'}})],
- [([MessageTemplate(self.conf['count'])],
- self._count),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get count'}})],
- self._get_count),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'reset'}})],
- self._reset)])
+ self.bus.register(
+ self.name,
+ "Counter",
+ [MessageTemplate({"count": {"type": "integer"}})],
+ [
+ ([MessageTemplate(self.conf["count"])], self._count),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get count"},
+ }
+ )
+ ],
+ self._get_count,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "reset"},
+ }
+ )
+ ],
+ self._reset,
+ ),
+ ],
+ )
async def _count(self, message: Message) -> None:
self._counter += 1
async def _get_count(self, message: Message) -> None:
- now = datetime.now().strftime(self.conf['date format'])
- await self.bus.send(Message(self.name, {'count': self._counter,
- 'since': self._since,
- 'until': now}))
+ now = datetime.now().strftime(self.conf["date format"])
+ await self.bus.send(
+ Message(
+ self.name, {"count": self._counter, "since": self._since, "until": now}
+ )
+ )
async def _reset(self, message: Message) -> None:
- now = datetime.now().strftime(self.conf['date format'])
+ now = datetime.now().strftime(self.conf["date format"])
counter = self._counter
self._counter = 0
- await self.bus.send(Message(self.name, {'count': counter,
- 'since': self._since,
- 'until': now}))
+ await self.bus.send(
+ Message(self.name, {"count": counter, "since": self._since, "until": now})
+ )
self._since = now
async def run(self) -> None:
test(): {'sender': 'Test Date', 'date': ...}
"""
- CONF_SCHEMA = {'properties': {'format':
- {'type': 'string',
- 'default': '%Y-%m-%d %H:%M:%S'}}}
+ CONF_SCHEMA = {
+ "properties": {"format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"}}
+ }
"""Schema for Date plugin configuration.
Optional configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'Date',
- [MessageTemplate({'date': {'type': 'string'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'get date'}})],
- self._date)])
+ self.bus.register(
+ self.name,
+ "Date",
+ [MessageTemplate({"date": {"type": "string"}})],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "get date"},
+ }
+ )
+ ],
+ self._date,
+ )
+ ],
+ )
async def _date(self, message: Message) -> None:
- date = datetime.now().strftime(self.conf['format'])
- await self.bus.send(Message(self.name, {'date': date}))
+ date = datetime.now().strftime(self.conf["format"])
+ await self.bus.send(Message(self.name, {"date": date}))
async def run(self) -> None:
"""Run no code proactively."""
test(): {'sender': 'Test GenericWait', 'event': 'finished',
'id': 'Long Wait'}
"""
+
import asyncio
from controlpi import BasePlugin, Message, MessageTemplate
test(): {'sender': 'Long Wait', 'event': 'finished'}
"""
- CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
- 'required': ['seconds']}
+ CONF_SCHEMA = {
+ "properties": {"seconds": {"type": "number"}},
+ "required": ["seconds"],
+ }
"""Schema for Wait plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._tasks = set()
- self.bus.register(self.name, 'Wait',
- [MessageTemplate({'event': {'const': 'finished'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'wait'}})],
- self._wait)])
+ self.bus.register(
+ self.name,
+ "Wait",
+ [MessageTemplate({"event": {"const": "finished"}})],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "wait"},
+ }
+ )
+ ],
+ self._wait,
+ )
+ ],
+ )
async def _wait(self, message: Message) -> None:
async def wait_coroutine():
- await asyncio.sleep(self.conf['seconds'])
- await self.bus.send(Message(self.name, {'event': 'finished'}))
+ await asyncio.sleep(self.conf["seconds"])
+ await self.bus.send(Message(self.name, {"event": "finished"}))
+
# Done in separate task to not block queue awaiting this callback:
task = asyncio.create_task(wait_coroutine())
self._tasks.add(task)
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._tasks = set()
- self.bus.register(self.name, 'GenericWait',
- [MessageTemplate({'event': {'const': 'finished'},
- 'id': {'type': 'string'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'wait'},
- 'seconds':
- {'type': 'number'},
- 'id':
- {'type': 'string'}})],
- self._wait)])
+ self.bus.register(
+ self.name,
+ "GenericWait",
+ [
+ MessageTemplate(
+ {"event": {"const": "finished"}, "id": {"type": "string"}}
+ )
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "wait"},
+ "seconds": {"type": "number"},
+ "id": {"type": "string"},
+ }
+ )
+ ],
+ self._wait,
+ )
+ ],
+ )
async def _wait(self, message: Message) -> None:
async def wait_coroutine():
- assert (isinstance(message['seconds'], float) or
- isinstance(message['seconds'], int))
- await asyncio.sleep(message['seconds'])
- await self.bus.send(Message(self.name, {'event': 'finished',
- 'id': message['id']}))
+ assert isinstance(message["seconds"], float) or isinstance(
+ message["seconds"], int
+ )
+ await asyncio.sleep(message["seconds"])
+ await self.bus.send(
+ Message(self.name, {"event": "finished", "id": message["id"]})
+ )
+
# Done in separate task to not block queue awaiting this callback:
task = asyncio.create_task(wait_coroutine())
self._tasks.add(task)
test(): {'sender': 'Timer', 'event': 'finished'}
"""
- CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
- 'required': ['seconds']}
+ CONF_SCHEMA = {
+ "properties": {"seconds": {"type": "number"}},
+ "required": ["seconds"],
+ }
"""Schema for Timer plugin configuration.
Required configuration key:
self._tasks = set()
self.started = 0
self.cancelled = 0
- self.bus.register(self.name, 'Timer',
- [MessageTemplate({'event':
- {'const': 'finished'}}),
- MessageTemplate({'event':
- {'const': 'cancelled'}})],
- [([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'start'}})],
- self._start),
- ([MessageTemplate({'target':
- {'const': self.name},
- 'command':
- {'const': 'cancel'}})],
- self._cancel)])
+ self.bus.register(
+ self.name,
+ "Timer",
+ [
+ MessageTemplate({"event": {"const": "finished"}}),
+ MessageTemplate({"event": {"const": "cancelled"}}),
+ ],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "start"},
+ }
+ )
+ ],
+ self._start,
+ ),
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "cancel"},
+ }
+ )
+ ],
+ self._cancel,
+ ),
+ ],
+ )
async def _start(self, message: Message) -> None:
self.started += 1
async def wait_coroutine():
- await asyncio.sleep(self.conf['seconds'])
+ await asyncio.sleep(self.conf["seconds"])
if self.cancelled > 0:
self.cancelled -= 1
self.started -= 1
elif self.started > 0:
self.started -= 1
- await self.bus.send(Message(self.name,
- {'event': 'finished'}))
+ await self.bus.send(Message(self.name, {"event": "finished"}))
+
# Done in separate task to not block queue awaiting this callback:
task = asyncio.create_task(wait_coroutine())
self._tasks.add(task)
if self.cancelled < self.started:
cancel = self.started - self.cancelled
self.cancelled = self.started
- await self.bus.send(Message(self.name, {'event': 'cancelled',
- 'count': cancel}))
+ await self.bus.send(
+ Message(self.name, {"event": "cancelled", "count": cancel})
+ )
async def run(self) -> None:
"""Run no code proactively."""
test(): {'sender': 'Loop', 'key': 'value'}
"""
- CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'},
- 'message': {'type': 'object'}},
- 'required': ['seconds', 'message']}
+ CONF_SCHEMA = {
+ "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}},
+ "required": ["seconds", "message"],
+ }
"""Schema for Wait plugin configuration.
Required configuration key:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'Periodic',
- [MessageTemplate.from_message(self.conf['message'])],
- [])
+ self.bus.register(
+ self.name,
+ "Periodic",
+ [MessageTemplate.from_message(self.conf["message"])],
+ [],
+ )
async def run(self) -> None:
"""Run periodic loop."""
while True:
- await asyncio.sleep(self.conf['seconds'])
- await self.bus.send(Message(self.name, self.conf['message']))
+ await asyncio.sleep(self.conf["seconds"])
+ await self.bus.send(Message(self.name, self.conf["message"]))