From d248d2c98abb30f1b330dfbb1e516b4113b394f4 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 7 Apr 2021 05:03:53 +0200 Subject: [PATCH] Intermediate state. --- controlpi_plugins/modbus.py | 708 ++++++++++++++++++++++++++++-------- setup.py | 1 + 2 files changed, 554 insertions(+), 155 deletions(-) diff --git a/controlpi_plugins/modbus.py b/controlpi_plugins/modbus.py index 2eab95b..3c99654 100644 --- a/controlpi_plugins/modbus.py +++ b/controlpi_plugins/modbus.py @@ -4,183 +4,581 @@ TODO: documentation, doctests """ +import asyncio +import serial # type: ignore +import serial_asyncio # type: ignore from controlpi import BasePlugin, Message, MessageTemplate -from typing import List - - -def crc_slow(message: bytes) -> bytes: - """Compute CRC for message. - - A 16 bit CRC as specified in the Modbus specification is computed for - the given message and this CRC is returned as two bytes (low byte - first). - - (This is the "slow" version without precomputed table.) - - >>> crc_slow(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex() - '5c90' - >>> crc_slow(bytes([0x08, 0x01, 0x01, 0x17])).hex() - '121a' - >>> crc_slow(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex() - '843f' - >>> crc_slow(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17, - ... 0x70])).hex() - 'a861' - >>> crc_slow(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex() - '8d41' - >>> crc_slow(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex() - '1c5f' - >>> crc_slow(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17, - ... 0x00])).hex() - 'db3e' - >>> crc_slow(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex() - '344c' - >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00, - ... 0x04, 0x93, 0xE0])).hex() - 'f42b' - >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex() - '19ec' - >>> crc_slow(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8, - ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex() - '964d' - >>> crc_slow(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex() - 'f471' - """ - crc = 0xFFFF - for byte in message: - crc ^= byte - for bit in range(8): - lsb = crc & 1 - crc >>= 1 - if lsb: - crc ^= 0xA001 - return bytes([crc & 0xFF, crc >> 8]) +from typing import Dict -_crc_table: List[int] = [] +def crc(message: bytes) -> bytes: + """Calculate CRC for message. + Calculate CRC for message using precomputed dictionary. -def crc(message: bytes) -> bytes: - """Compute CRC for message. - - A 16 bit CRC as specified in the Modbus specification is computed for - the given message and this CRC is returned as two bytes (low byte - first). - - (This is the "fast" version with precomputed table.) - - >>> crc(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex() - '5c90' - >>> crc(bytes([0x08, 0x01, 0x01, 0x17])).hex() - '121a' - >>> crc(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex() - '843f' - >>> crc(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17, - ... 0x70])).hex() - 'a861' - >>> crc(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex() - '8d41' - >>> crc(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex() - '1c5f' - >>> crc(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17, - ... 0x00])).hex() - 'db3e' - >>> crc(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex() - '344c' - >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00, - ... 0x04, 0x93, 0xE0])).hex() - 'f42b' - >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex() - '19ec' - >>> crc(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8, - ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex() - '964d' - >>> crc(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex() - 'f471' + We define 12 test messages with their expected results from the manual + of the Hitachi PJ series Modbus devices: + >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')), + ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')), + ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')), + ... (bytes.fromhex('05 03 06 0007 0000 1770'), + ... bytes.fromhex('A861')), + ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')), + ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')), + ... (bytes.fromhex('05 0F 0006 0006 02 1700'), + ... bytes.fromhex('DB3E')), + ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')), + ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'), + ... bytes.fromhex('F42B')), # error in manual + ... (bytes.fromhex('01 10 2B01 0002'), + ... bytes.fromhex('19EC')), # error in manual + ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'), + ... bytes.fromhex('964D')), # error in manual + ... (bytes.fromhex('01 17 04 0000 1388'), + ... bytes.fromhex('F471'))] + + Now, we test the crc function against this list: + >>> crc_zero = bytes.fromhex('0000') + >>> for (message, expected) in tests: + ... result = crc(message) + ... assert result == expected, (f"crc({message.hex()}) =" + ... f" {result.hex()} !=" + ... f" {expected.hex()}") + ... crc_message = message + expected + ... crc_result = crc(crc_message) + ... assert crc_result == crc_zero, (f"crc({crc_message.hex()}) = " + ... f" {crc_result.hex()} !=" + ... f" {zero.hex()}") + + Since the CRC of a whole message with its CRC appended is always zero, + we can check the CRC of incoming messages on the fly and just check for + it being zero at the end of transmission. + This is done in the plugin classes below and the crc function itself is + only used for constructing messages to be sent. """ - global _crc_table - if not _crc_table: - for crc in range(256): - for bit in range(8): - lsb = crc & 1 - crc >>= 1 - if lsb: - crc ^= 0xA001 - _crc_table.append(crc) crc = 0xFFFF for byte in message: crc ^= byte - table_value = _crc_table[crc & 0xFF] + dict_key = crc & 0xFF crc >>= 8 - crc ^= table_value - return bytes([crc & 0xFF, crc >> 8]) + crc ^= _crc_dict[dict_key] + return bytes((crc & 0xFF, crc >> 8)) + + +_crc_dict: Dict[int, int] = {} + +for dict_key in range(256): + dict_value = dict_key + for bit in range(8): + lsb = dict_value & 1 + dict_value >>= 1 + if lsb: + dict_value ^= 0xA001 + _crc_dict[dict_key] = dict_value -class Modbus(BasePlugin): - """… plugin. +class ModbusMaster(BasePlugin): + """Modbus-RTU master plugin. - Do this and that. + >>> import controlpi + >>> asyncio.run(controlpi.test( + ... {"Test Master": {"plugin": "ModbusMaster", + ... "device": "/dev/pts/4", + ... "parity": "none"}, + ... "Test Slave": {"plugin": "ModbusSlave", + ... "device": "/dev/pts/5", + ... "slave": 1, + ... "parity": "none"}}, + ... [{"target": "Test Master", "command": "read coils", + ... "slave": 1, "start": 1, "quantity": 2}, + ... {"target": "Test Master", "command": "write single coil", + ... "slave": 1, "address": 1, "value": False}, + ... {"target": "Test Master", "command": "write single coil", + ... "slave": 1, "address": 2, "value": True}, + ... {"target": "Test Master", "command": "read coils", + ... "slave": 1, "start": 1, "quantity": 2}, + ... {"target": "Test Master", "command": "read holding registers", + ... "slave": 1, "start": 7, "quantity": 2}, + ... {"target": "Test Master", "command": "write single register", + ... "slave": 1, "address": 7, "value": 42}, + ... {"target": "Test Master", "command": "write single register", + ... "slave": 1, "address": 8, "value": 42042}, + ... {"target": "Test Master", "command": "read holding registers", + ... "slave": 1, "start": 7, "quantity": 2}], 0.5)) + ... # doctest: +NORMALIZE_WHITESPACE """ CONF_SCHEMA = {'properties': {'device': {'type': 'string'}, - 'test device': {'type': 'string'}, - 'baudrate': {'type': 'integer'}, - 'slave types': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'object', - 'properties': - {'slaves': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'integer', - 'minimum': 1, - 'maximum': 247}}}, - 'coils': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'object', - 'properties': - {'address': - {'type': 'integer', - 'minimum': 0, - 'maximum': 65535}, - 'name': - {'type': 'string'}}, - 'required': ['address']}}}, - 'holding registers': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'object', - 'properties': - {'address': - {'type': 'integer', - 'minimum': 0, - 'maximum': 65535}, - 'name': - {'type': 'string'}, - 'count': - {'type': 'integer', - 'minimum': 1}}, - 'required': ['address']}}}}}}}}, + 'baudrate': {'type': 'integer', + 'enum': [2400, 4800, 9600, 19200, 38400, + 57600, 115200], + 'default': 9600}, + 'parity': {'type': 'string', + 'enum': ['even', 'odd', 'none'], + 'default': 'even'}, + 'stopbits': {'type': 'integer', + 'minimum': 1, + 'maximum': 2, + 'default': 1}, + 'retries': {'type': 'integer', + 'default': 2}, + 'response timeout': {'type': 'number', + 'default': 0.2}, + 'turnaround delay': {'type': 'number', + 'default': 0.1}}, 'required': ['device']} + def process_conf(self) -> None: + """Process configuration and register plugin as bus client.""" + # Default values: + if 'baudrate' not in self.conf: + self.conf['baudrate'] = 9600 + if 'parity' not in self.conf: + self.conf['parity'] = 'even' + if 'stopbits' not in self.conf: + self.conf['stopbits'] = 1 + if 'retries' not in self.conf: + self.conf['retries'] = 2 + if 'response timeout' not in self.conf: + self.conf['response timeout'] = 0.2 + if 'turnaround delay' not in self.conf: + self.conf['turnaround delay'] = 0.1 + # Constants from serial for parity and stop bits: + self._parity = serial.PARITY_EVEN + if self.conf['parity'] == 'odd': + self._parity = serial.PARITY_ODD + elif self.conf['parity'] == 'none': + self._parity = serial.PARITY_NONE + self._stopbits = serial.STOPBITS_ONE + if self.conf['stopbits'] == 2: + self._stopbits = serial.STOPBITS_TWO + # 1.5 char and 3.5 char times for protocol timing + # (according to Modbus specification): + self._t15 = 0.000750 + self._t35 = 0.001750 + if self.conf['baudrate'] <= 19200: + bit_per_char = 9 + if self.conf['parity'] != 'none': + bit_per_char += 1 + bit_per_char += self.conf['stopbits'] + seconds_per_char = bit_per_char / self.conf['baudrate'] + self._t15 = 1.5 * seconds_per_char + self._t35 = 3.5 * seconds_per_char + # Queue for Modbus messages to be sent: + self._queue: asyncio.Queue = asyncio.Queue() + # Message templates sent and received by plugin client: + sends = [] + receives = [] + self._function_codes = {} + self._function_names = {} + # Error messages: + sends.append(MessageTemplate( + {'event': {'const': 'error'}})) + # 01 - Read Coils: + self._function_codes['read coils'] = 0x01 + self._function_names[0x01] = 'read coils' + receives.append(MessageTemplate( + {'target': {'const': self.name}, + 'command': {'const': self._function_names[0x01]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 2000}})) + sends.append(MessageTemplate( + {'event': {'const': 'response'}, + 'function': {'const': self._function_names[0x01]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'values': {'type': 'array', 'items': {'type': 'boolean'}}})) + # 03 - Read Holding Registers: + self._function_codes['read holding registers'] = 0x03 + self._function_names[0x03] = 'read holding registers' + receives.append(MessageTemplate( + {'target': {'const': self.name}, + 'command': {'const': self._function_names[0x03]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 125}})) + sends.append(MessageTemplate( + {'event': {'const': 'response'}, + 'function': {'const': self._function_names[0x03]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'values': {'type': 'array', 'items': {'type': 'integer'}}})) + # 05 - Write Single Coil: + self._function_codes['write single coil'] = 0x05 + self._function_names[0x05] = 'write single coil' + receives.append(MessageTemplate( + {'target': {'const': self.name}, + 'command': {'const': self._function_names[0x05]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'value': {'type': 'boolean'}})) + sends.append(MessageTemplate( + {'event': {'const': 'response'}, + 'function': {'const': self._function_names[0x05]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'value': {'type': 'boolean'}})) + # 06 - Write Single Register: + self._function_codes['write single register'] = 0x06 + self._function_names[0x06] = 'write single register' + receives.append(MessageTemplate( + {'target': {'const': self.name}, + 'command': {'const': self._function_names[0x06]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})) + sends.append(MessageTemplate( + {'event': {'const': 'response'}, + 'function': {'const': self._function_names[0x06]}, + 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247}, + 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, + 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})) + # 08 - Diagnostic: TODO + # 0F - Write Multiple Coils: TODO + # 10 - Write Multiple Registers: TODO + # 17 - Read/Write Multiple Registers: TODO + self.bus.register(self.name, 'ModbusMaster', + sends, receives, self._receive) + async def _receive(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'spam': self.conf['spam']})) + assert isinstance(message['command'], str) + function_code = self._function_codes[message['command']] + assert isinstance(message['slave'], int) + modbus_message = bytes([message['slave'], function_code]) + if function_code == 0x01 or function_code == 0x03: + assert isinstance(message['start'], int) + start_hi = (message['start'] - 1) >> 8 + start_lo = (message['start'] - 1) & 0xFF + assert isinstance(message['quantity'], int) + quantity_hi = message['quantity'] >> 8 + quantity_lo = message['quantity'] & 0xFF + modbus_message += bytes([start_hi, start_lo, + quantity_hi, quantity_lo]) + if function_code == 0x05 or function_code == 0x06: + assert isinstance(message['address'], int) + address_hi = (message['address'] - 1) >> 8 + address_lo = (message['address'] - 1) & 0xFF + modbus_message += bytes([address_hi, address_lo]) + if function_code == 0x05: + assert isinstance(message['value'], bool) + if message['value']: + modbus_message += b'\xFF\x00' + else: + modbus_message += b'\x00\x00' + if function_code == 0x06: + assert isinstance(message['value'], int) + value_hi = message['value'] >> 8 + value_lo = message['value'] & 0xFF + modbus_message += bytes([value_hi, value_lo]) + modbus_message += crc(modbus_message) + await self._queue.put(modbus_message) + + async def _process_response(self, modbus_message: bytes) -> None: + message = Message(self.name) + message['event'] = 'response' + if len(modbus_message) < 4: + message['event'] = 'error' + message['description'] = 'Modbus message too short' + message['message'] = modbus_message.hex() + await self.bus.send(message) + return + if modbus_message[1] & 0x80: + message['event'] = 'error' + function_code = modbus_message[1] ^ 0x80 + message['function'] = self._function_names[function_code] + message['slave'] = modbus_message[0] + if len(modbus_message) != 5: + message['description'] = 'Modbus exception has wrong length' + message['message'] = modbus_message.hex() + else: + error_code = modbus_message[2] + if error_code == 0x01: + message['description'] = 'Illegal function' + elif error_code == 0x02: + message['description'] = 'Illegal data address' + elif error_code == 0x03: + message['description'] = 'Illegal data value' + elif error_code == 0x04: + message['description'] = 'Server device failure' + else: + message['description'] = f"Error code '{error_code:02X}'" + await self.bus.send(message) + return + function_code = modbus_message[1] + message['function'] = self._function_names[function_code] + message['slave'] = modbus_message[0] + if function_code == 0x01 or function_code == 0x03: + length = modbus_message[2] + if len(modbus_message) != length + 5: + message['event'] = 'error' + message['description'] = 'Modbus response has wrong length' + message['message'] = modbus_message.hex() + await self.bus.send(message) + return + if function_code == 0x01: + bool_values = [] + for i in range(length): + byte = modbus_message[3 + i] + for bit in range(8): + if byte & (1 << bit): + bool_values.append(True) + else: + bool_values.append(False) + message['values'] = bool_values + if function_code == 0x03: + int_values = [] + for i in range(0, length, 2): + byte_hi = modbus_message[3 + i] + byte_lo = modbus_message[4 + i] + int_values.append(byte_hi * 256 + byte_lo) + message['values'] = int_values + if function_code == 0x05 or function_code == 0x06: + if len(modbus_message) != 8: + message['event'] = 'error' + message['description'] = 'Modbus response has wrong length' + message['message'] = modbus_message.hex() + await self.bus.send(message) + return + address_hi = modbus_message[3] + address_lo = modbus_message[4] + message['address'] = address_hi * 256 + address_lo + 1 + if function_code == 0x05: + if modbus_message[5] == 0xFF and modbus_message[6] == 0x00: + message['value'] = True + elif modbus_message[5] == 0x00 and modbus_message[6] == 0x00: + message['value'] = False + else: + message['event'] = 'error' + message['description'] = 'Coil value not decodable' + message['message'] = modbus_message.hex() + await self.bus.send(message) + return + if function_code == 0x06: + value_hi = modbus_message[5] + value_lo = modbus_message[6] + message['value'] = value_hi * 256 + value_lo + await self.bus.send(message) + + async def run(self) -> None: + """Open serial device and start loop on modbus message queue.""" + reader, writer = await serial_asyncio.open_serial_connection( + url=self.conf['device'], baudrate=self.conf['baudrate'], + parity=self._parity, stopbits=self._stopbits) + while True: + modbus_request = await self._queue.get() + slave = modbus_request[0] + tries = 0 + while tries < self.conf['retries']: + tries += 1 + writer.write(modbus_request) + if slave == 0: + # Broadcast => just wait for delay and finish: + await asyncio.sleep(self.conf['turnaround delay']) + break + modbus_response = b'' + crc = 0xFFFF + t35_task: asyncio.Task = asyncio.create_task( + asyncio.sleep(self._t35)) + try: + # First byte is awaited for response timeout: + first_byte = await asyncio.wait_for( + reader.read(1), self.conf['response timeout']) + modbus_response += first_byte + crc ^= first_byte[0] + dict_key = crc & 0xFF + crc >>= 8 + crc ^= _crc_dict[dict_key] + except asyncio.TimeoutError: + message = Message(self.name) + message['event'] = 'error' + message['description'] = ( + f"Response time out on try {tries}.") + await self.bus.send(message) + continue + while True: + t35_task = asyncio.create_task(asyncio.sleep(self._t35)) + try: + # Subsequent bytes are only awaited for the time + # needed to transmit 1.5 characters: + next_byte = await asyncio.wait_for( + reader.read(1), self._t15) + modbus_response += next_byte + crc ^= next_byte[0] + dict_key = crc & 0xFF + crc >>= 8 + crc ^= _crc_dict[dict_key] + except asyncio.TimeoutError: + break + if crc != 0: + message = Message(self.name) + message['event'] = 'error' + message['description'] = ( + f"Response CRC failed on try {tries}.") + message['message'] = modbus_response.hex() + await self.bus.send(message) + continue + if modbus_response[0] != slave: + message = Message(self.name) + message['event'] = 'error' + message['description'] = ( + f"Response from wrong slave on try {tries}.") + message['message'] = modbus_response.hex() + await self.bus.send(message) + continue + await self._process_response(modbus_response) + await t35_task + break + self._queue.task_done() + + +class ModbusSlave(BasePlugin): + """Modbus slave plugin. + + This plugin implements a Modbus-RTU slave listening on a given serial + device. + + We can use this to test the communication with a Modbus master plugin + instance. + """ + + CONF_SCHEMA = {'properties': + {'device': {'type': 'string'}, + 'slave': {'type': 'integer', + 'minimum': 1, + 'maximum': 247}, + 'baudrate': {'type': 'integer', + 'enum': [2400, 4800, 9600, 19200, 38400, + 57600, 115200], + 'default': 9600}, + 'parity': {'type': 'string', + 'enum': ['even', 'odd', 'none'], + 'default': 'even'}, + 'stopbits': {'type': 'integer', + 'minimum': 1, + 'maximum': 2, + 'default': 1}}, + 'required': ['device', 'slave']} def process_conf(self) -> None: - """Register plugin as bus client.""" - message = Message(self.name, {'spam': self.conf['spam']}) - sends = [MessageTemplate.from_message(message)] - receives = [MessageTemplate({'target': {'const': self.name}})] - self.bus.register(self.name, 'Plugin', sends, receives, self._receive) + """Process configuration and register plugin as bus client.""" + # Default values: + if 'baudrate' not in self.conf: + self.conf['baudrate'] = 9600 + if 'parity' not in self.conf: + self.conf['parity'] = 'even' + if 'stopbits' not in self.conf: + self.conf['stopbits'] = 1 + # Constants from serial for parity and stop bits: + self._parity = serial.PARITY_EVEN + if self.conf['parity'] == 'odd': + self._parity = serial.PARITY_ODD + elif self.conf['parity'] == 'none': + self._parity = serial.PARITY_NONE + self._stopbits = serial.STOPBITS_ONE + if self.conf['stopbits'] == 2: + self._stopbits = serial.STOPBITS_TWO + # 1.5 char and 3.5 char times for protocol timing + # (according to Modbus specification): + self._t15 = 0.000750 + self._t35 = 0.001750 + if self.conf['baudrate'] <= 19200: + bit_per_char = 9 + if self.conf['parity'] != 'none': + bit_per_char += 1 + bit_per_char += self.conf['stopbits'] + seconds_per_char = bit_per_char / self.conf['baudrate'] + self._t15 = 1.5 * seconds_per_char + self._t35 = 3.5 * seconds_per_char + # Coils and registers: + self._coils: Dict[int, bool] = {} + self._registers: Dict[int, int] = {} + # Message templates sent by plugin client: + sends = [MessageTemplate({'event': {'const': 'received'}}), + MessageTemplate({'event': {'const': 'sent'}})] + self.bus.register(self.name, 'ModbusSlave', sends, [], self._receive) + + async def _receive(self, message: Message) -> None: + pass + + async def _process_request(self, modbus_request: bytes) -> bytes: + message = Message(self.name) + message['event'] = 'received' + message['message'] = modbus_request.hex() + await self.bus.send(message) + if len(modbus_request) < 4: + return b'' + function_code = modbus_request[1] + modbus_response = bytes([self.conf['slave']]) + if function_code == 0x01 or function_code == 0x03: + if len(modbus_request) != 8: + modbus_response += bytes([function_code | 0x80, 0x03]) + else: + start_hi = modbus_request[2] + start_lo = modbus_request[3] + start = start_hi * 256 + start_lo + quantity_hi = modbus_request[4] + quantity_lo = modbus_request[5] + quantity = quantity_hi * 256 + quantity_lo + if function_code == 0x01: + okay = True + for i in range(quantity): + if start + i not in self._coils: + modbus_response += bytes([0x81, 0x02]) + okay = False + break + else: + if self._coils[start + i]: + pass + if function_code == 0x03: + pass + elif function_code == 0x05 or function_code == 0x06: + pass + else: + modbus_response += bytes([function_code | 0x80, 0x01]) + modbus_response += crc(modbus_response) + message = Message(self.name) + message['event'] = 'sent' + message['message'] = modbus_response.hex() + await self.bus.send(message) + return modbus_response async def run(self) -> None: - """Send initial message.""" - await self.bus.send(Message(self.name, {'spam': self.conf['spam']})) + """Open serial device and start loop on its reader.""" + reader, writer = await serial_asyncio.open_serial_connection( + url=self.conf['device'], baudrate=self.conf['baudrate'], + parity=self._parity, stopbits=self._stopbits) + while True: + modbus_request = b'' + crc = 0xFFFF + t35_task: asyncio.Task = asyncio.create_task( + asyncio.sleep(self._t35)) + # First byte is read without timeout: + first_byte = await reader.read(1) + modbus_request += first_byte + crc ^= first_byte[0] + dict_key = crc & 0xFF + crc >>= 8 + crc ^= _crc_dict[dict_key] + while True: + t35_task = asyncio.create_task(asyncio.sleep(self._t35)) + try: + # Subsequent bytes are only awaited for the time + # needed to transmit 1.5 characters: + next_byte = await asyncio.wait_for( + reader.read(1), self._t15) + modbus_request += next_byte + crc ^= next_byte[0] + dict_key = crc & 0xFF + crc >>= 8 + crc ^= _crc_dict[dict_key] + except asyncio.TimeoutError: + break + if crc != 0: + continue + if modbus_request[0] != self.conf['slave']: + continue + modbus_response = await self._process_request(modbus_request) + await t35_task + if modbus_response: + writer.write(modbus_response) diff --git a/setup.py b/setup.py index c690cea..c7b9b68 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ setuptools.setup( url="http://docs.graph-it.com/graphit/controlpi-modbus", packages=["controlpi_plugins"], install_requires=[ + "pyserial-asyncio", "controlpi @ git+git://git.graph-it.com/graphit/controlpi.git", ], classifiers=[ -- 2.34.1