--- /dev/null
+"""Modbus implementation.
+
+…
+
+TODO: documentation, doctests
+"""
+from controlpi import BasePlugin, Message, MessageTemplate
+
+from typing import List
+
+
+def crc_slow(message: bytes) -> bytes:
+ """Compute CRC for message.
+
+ A 16 bit CRC as specified in the Modbus specification is computed for
+ the given message and this CRC is returned as two bytes (low byte
+ first).
+
+ (This is the slow version without precomputed table.)
+
+ >>> crc_slow(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex()
+ '5c90'
+ >>> crc_slow(bytes([0x08, 0x01, 0x01, 0x17])).hex()
+ '121a'
+ >>> crc_slow(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex()
+ '843f'
+ >>> crc_slow(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17,
+ ... 0x70])).hex()
+ 'a861'
+ >>> crc_slow(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex()
+ '8d41'
+ >>> crc_slow(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex()
+ '1c5f'
+ >>> crc_slow(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17,
+ ... 0x00])).hex()
+ 'db3e'
+ >>> crc_slow(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex()
+ '344c'
+ >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00,
+ ... 0x04, 0x93, 0xE0])).hex()
+ 'f42b'
+ >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex()
+ '19ec'
+ >>> crc_slow(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8,
+ ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
+ '964d'
+ >>> crc_slow(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
+ 'f471'
+ """
+ crc = 0xFFFF
+ for byte in message:
+ crc ^= byte
+ for bit in range(8):
+ lsb = crc & 1
+ crc >>= 1
+ if lsb:
+ crc ^= 0xA001
+ return bytes([crc & 0xFF, crc >> 8])
+
+
+_crc_table: List[int] = []
+
+
+def crc(message: bytes) -> bytes:
+ """Compute CRC for message.
+
+ A 16 bit CRC as specified in the Modbus specification is computed for
+ the given message and this CRC is returned as two bytes (low byte
+ first).
+
+ >>> crc(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex()
+ '5c90'
+ >>> crc(bytes([0x08, 0x01, 0x01, 0x17])).hex()
+ '121a'
+ >>> crc(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex()
+ '843f'
+ >>> crc(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17,
+ ... 0x70])).hex()
+ 'a861'
+ >>> crc(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex()
+ '8d41'
+ >>> crc(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex()
+ '1c5f'
+ >>> crc(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17,
+ ... 0x00])).hex()
+ 'db3e'
+ >>> crc(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex()
+ '344c'
+ >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00,
+ ... 0x04, 0x93, 0xE0])).hex()
+ 'f42b'
+ >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex()
+ '19ec'
+ >>> crc(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8,
+ ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
+ '964d'
+ >>> crc(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
+ 'f471'
+ """
+ global _crc_table
+ if not _crc_table:
+ for crc in range(256):
+ for bit in range(8):
+ lsb = crc & 1
+ crc >>= 1
+ if lsb:
+ crc ^= 0xA001
+ _crc_table.append(crc)
+ crc = 0xFFFF
+ for byte in message:
+ crc ^= byte
+ table_value = _crc_table[crc & 0xFF]
+ crc >>= 8
+ crc ^= table_value
+ return bytes([crc & 0xFF, crc >> 8])
+
+
+class Modbus(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'device': {'type': 'string'},
+ 'test device': {'type': 'string'},
+ 'baudrate': {'type': 'integer'},
+ 'slave types':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'object',
+ 'properties':
+ {'slaves':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'integer',
+ 'minimum': 1,
+ 'maximum': 247}}},
+ 'coils':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'object',
+ 'properties':
+ {'address':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 65535},
+ 'name':
+ {'type': 'string'}},
+ 'required': ['address']}}},
+ 'holding registers':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'object',
+ 'properties':
+ {'address':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 65535},
+ 'name':
+ {'type': 'string'},
+ 'count':
+ {'type': 'integer',
+ 'minimum': 1}},
+ 'required': ['address']}}}}}}}},
+ 'required': ['device']}
+
+ async def _receive(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ message = Message(self.name, {'spam': self.conf['spam']})
+ sends = [MessageTemplate.from_message(message)]
+ receives = [MessageTemplate({'target': {'const': self.name}})]
+ self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Send initial message."""
+ await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))