"""
from controlpi import BasePlugin, Message, MessageTemplate
-from typing import Dict
+from typing import Dict, List
class State(BasePlugin):
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.state: bool = False
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- self.bus.register(self.name, 'State', sends,
+ self.bus.register(self.name, 'State',
+ [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})],
[([MessageTemplate({'target':
{'const': self.name},
'command':
{'const': 'get state'}})],
- self.get_state),
+ self._get_state),
([MessageTemplate({'target':
{'const': self.name},
'command':
{'const': 'set state'},
'new state':
{'type': 'boolean'}})],
- self.set_state)])
+ self._set_state)])
- async def get_state(self, message: Message) -> None:
- """Get current state."""
+ async def _get_state(self, message: Message) -> None:
await self.bus.send(Message(self.name, {'state': self.state}))
- async def set_state(self, message: Message) -> None:
- """Set state to new state given in message."""
+ async def _set_state(self, message: Message) -> None:
if self.state != message['new state']:
assert isinstance(message['new state'], bool)
self.state = message['new state']
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'target': {'const': self.conf['alias for']},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': self.conf['alias for']},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}),
- MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- self.bus.register(self.name, 'StateAlias', sends,
+ self.bus.register(self.name, 'StateAlias',
+ [MessageTemplate({'target':
+ {'const': self.conf['alias for']},
+ 'command':
+ {'const': 'get state'}}),
+ MessageTemplate({'target':
+ {'const': self.conf['alias for']},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})],
[([MessageTemplate({'target':
{'const': self.name},
'command':
{'const': 'get state'}})],
- self.get_state),
+ self._get_state),
([MessageTemplate({'target':
{'const': self.name},
'command':
{'const': 'set state'},
'new state':
{'type': 'boolean'}})],
- self.set_state),
+ self._set_state),
([MessageTemplate({'sender':
{'const':
self.conf['alias for']},
'state':
{'type': 'boolean'}})],
- self.translate)])
+ self._translate)])
- async def get_state(self, message: Message) -> None:
- """Get state from other state."""
+ async def _get_state(self, message: Message) -> None:
await self.bus.send(Message(self.name,
{'target': self.conf['alias for'],
'command': 'get state'}))
- async def set_state(self, message: Message) -> None:
- """Set state on other state."""
+ async def _set_state(self, message: Message) -> None:
await self.bus.send(Message(self.name,
{'target': self.conf['alias for'],
'command': 'set state',
'new state': message['new state']}))
- async def translate(self, message: Message) -> None:
- """Translate state messages from other state."""
+ async def _translate(self, message: Message) -> None:
alias_message = Message(self.name)
if 'event' in message and message['event'] == 'changed':
alias_message['event'] = 'changed'
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
+ updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
for state in self.conf['states']:
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ updates.append(MessageTemplate({'sender': {'const': state},
+ 'state': {'type': 'boolean'}}))
self.states[state] = False
self.state: bool = all(self.states.values())
self.bus.register(self.name, 'AndState',
- sends, receives, self.receive)
-
- async def receive(self, message: Message) -> None:
- """Process "get state" command and messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
- await self.bus.send(Message(self.name, {'state': self.state}))
- if 'state' in message and message['sender'] in self.conf['states']:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = all(self.states.values())
- if self.state != new_state:
- self.state = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
+ [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ (updates, self._update)])
+
+ async def _get_state(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'state': self.state}))
+
+ async def _update(self, message: Message) -> None:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = all(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': self.state}))
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
+ updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
for state in self.conf['states']:
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ updates.append(MessageTemplate({'sender': {'const': state},
+ 'state': {'type': 'boolean'}}))
self.states[state] = False
self.state: bool = any(self.states.values())
self.bus.register(self.name, 'OrState',
- sends, receives, self.receive)
-
- async def receive(self, message: Message) -> None:
- """Process "get state" command and messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
- await self.bus.send(Message(self.name, {'state': self.state}))
- if 'state' in message and message['sender'] in self.conf['states']:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = any(self.states.values())
- if self.state != new_state:
- self.state = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
+ [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ (updates, self._update)])
+
+ async def _get_state(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'state': self.state}))
+
+ async def _update(self, message: Message) -> None:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = any(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': self.state}))
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'target': {'const':
- self.conf['output state']},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
+ updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
for state in self.conf['input states']:
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ updates.append(MessageTemplate({'sender': {'const': state},
+ 'state': {'type': 'boolean'}}))
self.states[state] = False
self.state: bool = all(self.states.values())
self.bus.register(self.name, 'AndSet',
- sends, receives, self.receive)
+ [MessageTemplate({'target':
+ {'const':
+ self.conf['output state']},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ (updates, self._update)])
- async def receive(self, message: Message) -> None:
- """Process messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
+ async def _get_state(self, message: Message) -> None:
+ await self.bus.send(Message(self.name,
+ {'target': self.conf['output state'],
+ 'command': 'set state',
+ 'new state': self.state}))
+
+ async def _update(self, message: Message) -> None:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = all(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
await self.bus.send(Message(self.name,
- {'target': self.conf['output state'],
+ {'target':
+ self.conf['output state'],
'command': 'set state',
'new state': self.state}))
- if ('state' in message and
- message['sender'] in self.conf['input states']):
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = all(self.states.values())
- if self.state != new_state:
- self.state = new_state
- await self.bus.send(Message(self.name,
- {'target':
- self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'target': {'const':
- self.conf['output state']},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
+ updates: List[MessageTemplate] = []
self.states: Dict[str, bool] = {}
for state in self.conf['input states']:
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'boolean'}}))
+ updates.append(MessageTemplate({'sender': {'const': state},
+ 'state': {'type': 'boolean'}}))
self.states[state] = False
self.state: bool = any(self.states.values())
- self.bus.register(self.name, 'OrSet',
- sends, receives, self.receive)
+ self.bus.register(self.name, 'AndSet',
+ [MessageTemplate({'target':
+ {'const':
+ self.conf['output state']},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ (updates, self._update)])
- async def receive(self, message: Message) -> None:
- """Process messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
+ async def _get_state(self, message: Message) -> None:
+ await self.bus.send(Message(self.name,
+ {'target': self.conf['output state'],
+ 'command': 'set state',
+ 'new state': self.state}))
+
+ async def _update(self, message: Message) -> None:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = any(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
await self.bus.send(Message(self.name,
- {'target': self.conf['output state'],
+ {'target':
+ self.conf['output state'],
'command': 'set state',
'new state': self.state}))
- if ('state' in message and
- message['sender'] in self.conf['input states']):
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = any(self.states.values())
- if self.state != new_state:
- self.state = new_state
- await self.bus.send(Message(self.name,
- {'target':
- self.conf['output state'],
- 'command': 'set state',
- 'new state': self.state}))
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
+ self.bus.register(self.name, 'Log',
+ [],
+ [(self.conf['filter'], self._log)])
- async def log(self, message: Message) -> None:
- """Log received message on stdout using own name as prefix."""
+ async def _log(self, message: Message) -> None:
print(f"{self.name}: {message}")
async def run(self) -> None:
def process_conf(self) -> None:
"""Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'execute'}})]
- sends = [MessageTemplate.from_message(message)
- for message in self.conf['messages']]
- self.bus.register(self.name, 'Init', sends, receives, self.execute)
-
- async def execute(self, message: Message) -> None:
- """Send configured messages."""
+ self.bus.register(self.name, 'Init',
+ [MessageTemplate.from_message(message)
+ for message in self.conf['messages']],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'execute'}})],
+ self._execute)])
+
+ async def _execute(self, message: Message) -> None:
for message in self.conf['messages']:
await self.bus.send(Message(self.name, message))
# Give immediate reactions to messages opportunity to happen:
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.messages: List[Message] = []
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set messages'},
- 'messages':
- {'type': 'array',
- 'items': {'type': 'object'}}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'execute'}})]
- sends = [MessageTemplate()]
- self.bus.register(self.name, 'Execute', sends, receives, self.execute)
-
- async def execute(self, message: Message) -> None:
- """Set or send configured messages."""
- if message['command'] == 'set messages':
- assert isinstance(message['messages'], list)
- self.messages = list(message['messages'])
- elif message['command'] == 'execute':
- for message in self.messages:
- await self.bus.send(Message(self.name, message))
- # Give immediate reactions to messages opportunity to happen:
- await asyncio.sleep(0)
+ self.bus.register(self.name, 'Execute',
+ [MessageTemplate()],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'set messages'},
+ 'messages':
+ {'type': 'array',
+ 'items':
+ {'type': 'object'}}})],
+ self._set_messages),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'execute'}})],
+ self._execute)])
+
+ async def _set_messages(self, message: Message) -> None:
+ assert isinstance(message['messages'], list)
+ self.messages = list(message['messages'])
+
+ async def _execute(self, message: Message) -> None:
+ for message in self.messages:
+ await self.bus.send(Message(self.name, message))
+ # Give immediate reactions to messages opportunity to happen:
+ await asyncio.sleep(0)
async def run(self) -> None:
"""Run no code proactively."""
self._translate[pair['from']] = pair['to']
self.bus.register(self.name, 'Alias',
sends,
- [self.conf['from']],
- self.alias)
+ [([self.conf['from']], self._alias)])
- async def alias(self, message: Message) -> None:
- """Translate and send message."""
+ async def _alias(self, message: Message) -> None:
# Prevent endless loop:
if message['sender'] != self.name:
for to in self._to:
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._since = datetime.now().strftime(self.conf['date format'])
- self._count = 0
- self._template = MessageTemplate(self.conf['count'])
- sends = [MessageTemplate({'count': {'type': 'integer'}})]
- receives = [self._template,
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get count'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'reset'}})]
- self.bus.register(self.name, 'Counter', sends, receives, self.receive)
-
- async def receive(self, message: Message) -> None:
- """Count messages, send and optionally reset counts."""
- if self._template.check(message):
- self._count += 1
- if ('target' in message and message['target'] == self.name and
- 'command' in message):
- if message['command'] == 'get count':
- now = datetime.now().strftime(self.conf['date format'])
- await self.bus.send(Message(self.name, {'count': self._count,
- 'since': self._since,
- 'until': now}))
- elif message['command'] == 'reset':
- now = datetime.now().strftime(self.conf['date format'])
- count = self._count
- self._count = 0
- await self.bus.send(Message(self.name, {'count': count,
- 'since': self._since,
- 'until': now}))
- self._since = now
+ self._counter = 0
+ self.bus.register(self.name, 'Counter',
+ [MessageTemplate({'count': {'type': 'integer'}})],
+ [([MessageTemplate(self.conf['count'])],
+ self._count),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get count'}})],
+ self._get_count),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'reset'}})],
+ self._reset)])
+
+ async def _count(self, message: Message) -> None:
+ self._counter += 1
+
+ async def _get_count(self, message: Message) -> None:
+ now = datetime.now().strftime(self.conf['date format'])
+ await self.bus.send(Message(self.name, {'count': self._counter,
+ 'since': self._since,
+ 'until': now}))
+
+ async def _reset(self, message: Message) -> None:
+ now = datetime.now().strftime(self.conf['date format'])
+ counter = self._counter
+ self._counter = 0
+ await self.bus.send(Message(self.name, {'count': counter,
+ 'since': self._since,
+ 'until': now}))
+ self._since = now
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate({'date': {'type': 'string'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get date'}})]
- self.bus.register(self.name, 'Date', sends, receives, self.date)
-
- async def date(self, message: Message) -> None:
- """Send current date and time as message."""
+ self.bus.register(self.name, 'Date',
+ [MessageTemplate({'date': {'type': 'string'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get date'}})],
+ self._date)])
+
+ async def _date(self, message: Message) -> None:
date = datetime.now().strftime(self.conf['format'])
await self.bus.send(Message(self.name, {'date': date}))
def process_conf(self) -> None:
"""Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'wait'}})]
- sends = [MessageTemplate({'event': {'const': 'finished'}})]
- self.bus.register(self.name, 'Wait', sends, receives, self.wait)
-
- async def wait(self, message: Message) -> None:
- """Wait configured time and send "finished" event."""
+ self.bus.register(self.name, 'Wait',
+ [MessageTemplate({'event': {'const': 'finished'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'wait'}})],
+ self._wait)])
+
+ async def _wait(self, message: Message) -> None:
async def wait_coroutine():
await asyncio.sleep(self.conf['seconds'])
await self.bus.send(Message(self.name, {'event': 'finished'}))
def process_conf(self) -> None:
"""Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'wait'},
- 'seconds': {'type': 'number'},
- 'id': {'type': 'string'}})]
- sends = [MessageTemplate({'event': {'const': 'finished'},
- 'id': {'type': 'string'}})]
- self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
-
- async def wait(self, message: Message) -> None:
- """Wait given time and send "finished" event with given "id"."""
+ self.bus.register(self.name, 'GenericWait',
+ [MessageTemplate({'event': {'const': 'finished'},
+ 'id': {'type': 'string'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'wait'},
+ 'seconds':
+ {'type': 'number'},
+ 'id':
+ {'type': 'string'}})],
+ self._wait)])
+
+ async def _wait(self, message: Message) -> None:
async def wait_coroutine():
assert (isinstance(message['seconds'], float) or
isinstance(message['seconds'], int))
def process_conf(self) -> None:
"""Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'start'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'cancel'}})]
- sends = [MessageTemplate({'event': {'const': 'finished'}}),
- MessageTemplate({'event': {'const': 'cancelled'}})]
- self.bus.register(self.name, 'Timer', sends, receives, self._receive)
self.started = 0
self.cancelled = 0
+ self.bus.register(self.name, 'Timer',
+ [MessageTemplate({'event':
+ {'const': 'finished'}}),
+ MessageTemplate({'event':
+ {'const': 'cancelled'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'start'}})],
+ self._start),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'cancel'}})],
+ self._cancel)])
+
+ async def _start(self, message: Message) -> None:
+ self.started += 1
+
+ async def wait_coroutine():
+ await asyncio.sleep(self.conf['seconds'])
+ if self.cancelled > 0:
+ self.cancelled -= 1
+ self.started -= 1
+ elif self.started > 0:
+ self.started -= 1
+ await self.bus.send(Message(self.name,
+ {'event': 'finished'}))
+ # Done in separate task to not block queue awaiting this callback:
+ asyncio.create_task(wait_coroutine())
- async def _receive(self, message: Message) -> None:
- if message['command'] == 'cancel':
- if self.cancelled < self.started:
- cancel = self.started - self.cancelled
- self.cancelled = self.started
- await self.bus.send(Message(self.name, {'event': 'cancelled',
- 'count': cancel}))
- if message['command'] == 'start':
- self.started += 1
-
- async def wait_coroutine():
- await asyncio.sleep(self.conf['seconds'])
- if self.cancelled > 0:
- self.cancelled -= 1
- self.started -= 1
- elif self.started > 0:
- self.started -= 1
- await self.bus.send(Message(self.name,
- {'event': 'finished'}))
- # Done in separate task to not block queue awaiting this callback:
- asyncio.create_task(wait_coroutine())
+ async def _cancel(self, message: Message) -> None:
+ if self.cancelled < self.started:
+ cancel = self.started - self.cancelled
+ self.cancelled = self.started
+ await self.bus.send(Message(self.name, {'event': 'cancelled',
+ 'count': cancel}))
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
- sends = [MessageTemplate.from_message(self.conf['message'])]
- self.bus.register(self.name, 'Periodic', sends, [], self.nop)
-
- async def nop(self, message: Message) -> None:
- """Do nothing."""
- pass
+ self.bus.register(self.name, 'Periodic',
+ [MessageTemplate.from_message(self.conf['message'])],
+ [])
async def run(self) -> None:
"""Run periodic loop."""