From a0e2da95b4b080c0fc0ca047d533a5b6ab3a2780 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Fri, 9 Apr 2021 09:28:41 +0200 Subject: [PATCH] Streamline code, start API documentation. --- controlpi_plugins/modbus.py | 214 +++++++++++++++--------------------- 1 file changed, 91 insertions(+), 123 deletions(-) diff --git a/controlpi_plugins/modbus.py b/controlpi_plugins/modbus.py index 9719c1e..3ea6c4a 100644 --- a/controlpi_plugins/modbus.py +++ b/controlpi_plugins/modbus.py @@ -1,8 +1,6 @@ """Modbus implementation. … - -TODO: documentation, doctests """ import asyncio import serial # type: ignore @@ -160,26 +158,35 @@ CRC.precompute() class ModbusMaster(BasePlugin): """Modbus-RTU master plugin. + The plugin receives commands for executing Modbus functions on the + ControlPi message bus, encodes and sends them on the Modbus and + translates the responses back to ControlPi messages. + + In order to demonstrate this without accessing a real hardware serial + connection, we connect two pseudo ttys: >>> import pty, os, controlpi >>> master_controller, master = pty.openpty() >>> slave_controller, slave = pty.openpty() >>> def relay(source, target): ... content = os.read(source, 2147483647) ... os.write(target, content) - >>> async def test(): + >>> def relay_setup(): ... loop = asyncio.get_event_loop() ... loop.add_reader(master_controller, relay, ... master_controller, slave_controller) ... loop.add_reader(slave_controller, relay, ... slave_controller, master_controller) + + + + >>> async def test(): + ... relay_setup() ... await controlpi.test( ... {"Test Master": {"plugin": "ModbusMaster", - ... "device": os.ttyname(master), - ... "parity": "none"}, + ... "device": os.ttyname(master)}, ... "Test Slave": {"plugin": "ModbusSlave", ... "device": os.ttyname(slave), - ... "slave": 1, - ... "parity": "none"}}, + ... "slave": 1}}, ... [{"target": "Test Master", "command": "read coils", ... "slave": 1, "start": 1, "quantity": 2}, ... {"target": "Test Master", "command": "write single coil", @@ -262,6 +269,7 @@ class ModbusMaster(BasePlugin): test(): {'sender': '', 'event': 'registered', 'client': 'Test Slave', 'plugin': 'ModbusSlave', 'sends': [{'event': {'const': 'received'}}, + {'event': {'const': 'crc error'}}, {'event': {'const': 'sent'}}], 'receives': []} test(): {'sender': 'test()', 'target': 'Test Master', @@ -566,9 +574,8 @@ class ModbusMaster(BasePlugin): if function_code == 0x03: int_values = [] for i in range(0, length, 2): - byte_hi = modbus_message[3 + i] - byte_lo = modbus_message[4 + i] - int_values.append(byte_hi * 256 + byte_lo) + int_values.append(modbus_message[3 + i]*256 + + modbus_message[3 + i + 1]) message['values'] = int_values if function_code == 0x05 or function_code == 0x06: if len(modbus_message) != 8: @@ -577,9 +584,7 @@ class ModbusMaster(BasePlugin): message['message'] = modbus_message.hex() await self.bus.send(message) return - address_hi = modbus_message[2] - address_lo = modbus_message[3] - message['address'] = address_hi * 256 + address_lo + 1 + message['address'] = modbus_message[2]*256 + modbus_message[3] + 1 if function_code == 0x05: if modbus_message[4] == 0xFF and modbus_message[5] == 0x00: message['value'] = True @@ -592,9 +597,7 @@ class ModbusMaster(BasePlugin): await self.bus.send(message) return if function_code == 0x06: - value_hi = modbus_message[4] - value_lo = modbus_message[5] - message['value'] = value_hi * 256 + value_lo + message['value'] = modbus_message[4] * 256 + modbus_message[5] await self.bus.send(message) async def run(self) -> None: @@ -694,31 +697,30 @@ class ModbusSlave(BasePlugin): def process_conf(self) -> None: """Process configuration and register plugin as bus client.""" # Default values: - if 'baudrate' not in self.conf: - self.conf['baudrate'] = 9600 - if 'parity' not in self.conf: - self.conf['parity'] = 'even' - if 'stopbits' not in self.conf: - self.conf['stopbits'] = 1 - # Constants from serial for parity and stop bits: + self._device = self.conf['device'] + self._slave = self.conf['slave'] + self._baudrate = 9600 + if 'baudrate' in self.conf: + self._baudrate = self.conf['baudrate'] self._parity = serial.PARITY_EVEN - if self.conf['parity'] == 'odd': - self._parity = serial.PARITY_ODD - elif self.conf['parity'] == 'none': - self._parity = serial.PARITY_NONE + if 'parity' in self.conf: + if self.conf['parity'] == 'odd': + self._parity = serial.PARITY_ODD + elif self.conf['parity'] == 'none': + self._parity = serial.PARITY_NONE self._stopbits = serial.STOPBITS_ONE - if self.conf['stopbits'] == 2: + if 'stopbits' in self.conf and self.conf['stopbits'] == 2: self._stopbits = serial.STOPBITS_TWO # 1.5 char and 3.5 char times for protocol timing # (according to Modbus specification): self._t15 = 0.000750 self._t35 = 0.001750 - if self.conf['baudrate'] <= 19200: - bit_per_char = 9 - if self.conf['parity'] != 'none': - bit_per_char += 1 - bit_per_char += self.conf['stopbits'] - seconds_per_char = bit_per_char / self.conf['baudrate'] + if self._baudrate <= 19200: + bit_per_char = 11 + if (self._parity == serial.PARITY_NONE and + self._stopbits == serial.STOPBITS_ONE): + bit_per_char = 10 + seconds_per_char = bit_per_char / self._baudrate self._t15 = 1.5 * seconds_per_char self._t35 = 3.5 * seconds_per_char # Coils and registers: @@ -726,6 +728,7 @@ class ModbusSlave(BasePlugin): self._registers: Dict[int, int] = {} # Message templates sent by plugin client: sends = [MessageTemplate({'event': {'const': 'received'}}), + MessageTemplate({'event': {'const': 'crc error'}}), MessageTemplate({'event': {'const': 'sent'}})] self.bus.register(self.name, 'ModbusSlave', sends, [], self._receive) @@ -733,124 +736,89 @@ class ModbusSlave(BasePlugin): pass async def _process_request(self, modbus_request: bytes) -> bytes: - message = Message(self.name) - message['event'] = 'received' - message['message'] = modbus_request.hex() - await self.bus.send(message) if len(modbus_request) < 4: return b'' + if modbus_request[0] != self._slave: + return b'' + slave = modbus_request[0] function_code = modbus_request[1] - modbus_response = bytes([self.conf['slave']]) if function_code == 0x01 or function_code == 0x03: if len(modbus_request) != 8: - modbus_response += bytes([function_code | 0x80, 0x03]) - else: - start_hi = modbus_request[2] - start_lo = modbus_request[3] - start = start_hi * 256 + start_lo - quantity_hi = modbus_request[4] - quantity_lo = modbus_request[5] - quantity = quantity_hi * 256 + quantity_lo - if function_code == 0x01: - okay = True - data = [] - byte = 0 - bit = 0 - for i in range(quantity): - if start + i not in self._coils: - modbus_response += bytes([0x81, 0x02]) - okay = False - break - if self._coils[start + i]: - byte += 2**bit - bit += 1 - if bit == 8: - data.append(byte) - byte = 0 - bit = 0 - if okay: - if bit > 0: - data.append(byte) - modbus_response += bytes([0x01, len(data)]) - modbus_response += bytes(data) - if function_code == 0x03: - okay = True - data = [] - for i in range(quantity): - if start + i not in self._registers: - modbus_response += bytes([0x83, 0x02]) - okay = False - break - data.append(self._registers[start + i] >> 8) - data.append(self._registers[start + i] & 0xFF) - if okay: - modbus_response += bytes([0x03, len(data)]) - modbus_response += bytes(data) + return bytes((slave, function_code | 0x80, 0x03)) + start = modbus_request[2]*256 + modbus_request[3] + quantity = modbus_request[4]*256 + modbus_request[5] + data = [] + if function_code == 0x01: + byte = 0 + bit = 0 + for i in range(quantity): + if start + i not in self._coils: + return bytes((slave, 0x81, 0x02)) + if self._coils[start + i]: + byte += 2**bit + bit += 1 + if bit == 8: + data.append(byte) + byte = 0 + bit = 0 + if bit > 0: + data.append(byte) + elif function_code == 0x03: + for i in range(quantity): + if start + i not in self._registers: + return bytes((slave, 0x83, 0x02)) + data.append(self._registers[start + i] >> 8) + data.append(self._registers[start + i] & 0xFF) + return bytes([slave, function_code, len(data)] + data) elif function_code == 0x05 or function_code == 0x06: if len(modbus_request) != 8: - modbus_response += bytes([function_code | 0x80, 0x03]) - else: - address_hi = modbus_request[2] - address_lo = modbus_request[3] - address = address_hi * 256 + address_lo - if function_code == 0x05: - okay = True - if (modbus_request[4] == 0xFF and - modbus_request[5] == 0x00): - self._coils[address] = True - elif (modbus_request[4] == 0x00 and - modbus_request[5] == 0x00): - self._coils[address] = False - else: - modbus_response += bytes([0x85, 0x03]) - okay = False - if okay: - modbus_response = modbus_request[:-2] - if function_code == 0x06: - value_hi = modbus_request[4] - value_lo = modbus_request[5] - value = value_hi * 256 + value_lo - self._registers[address] = value - modbus_response = modbus_request[:-2] - else: - modbus_response += bytes([function_code | 0x80, 0x01]) - modbus_response += bytes(CRC(modbus_response)) - message = Message(self.name) - message['event'] = 'sent' - message['message'] = modbus_response.hex() - await self.bus.send(message) - return modbus_response + return bytes((slave, function_code | 0x80, 0x03)) + address = modbus_request[2]*256 + modbus_request[3] + if function_code == 0x05: + if modbus_request[4] == 0xFF and modbus_request[5] == 0x00: + self._coils[address] = True + elif modbus_request[4] == 0x00 and modbus_request[5] == 0x00: + self._coils[address] = False + else: + return bytes((slave, 0x81, 0x03)) + elif function_code == 0x06: + value = modbus_request[4]*256 + modbus_request[5] + self._registers[address] = value + return modbus_request[:-2] + return bytes((slave, function_code | 0x80, 0x01)) async def run(self) -> None: """Open serial device and start loop on its reader.""" reader, writer = await serial_asyncio.open_serial_connection( - url=self.conf['device'], baudrate=self.conf['baudrate'], + url=self._device, baudrate=self._baudrate, parity=self._parity, stopbits=self._stopbits) while True: modbus_request = b'' crc = CRC() + first_byte = await reader.read(1) t35_task: asyncio.Task = asyncio.create_task( asyncio.sleep(self._t35)) - # First byte is read without timeout: - first_byte = await reader.read(1) modbus_request += first_byte crc.update(first_byte[0]) while True: - t35_task = asyncio.create_task(asyncio.sleep(self._t35)) try: - # Subsequent bytes are only awaited for the time - # needed to transmit 1.5 characters: next_byte = await asyncio.wait_for( reader.read(1), self._t15) + t35_task = asyncio.create_task(asyncio.sleep(self._t35)) modbus_request += next_byte crc.update(next_byte[0]) except asyncio.TimeoutError: break if not crc: + await self.bus.send(Message(self.name, {'event': 'crc error', + 'message': modbus_request.hex()})) continue - if modbus_request[0] != self.conf['slave']: - continue + await self.bus.send(Message(self.name, {'event': 'received', + 'message': modbus_request.hex()})) modbus_response = await self._process_request(modbus_request) - await t35_task if modbus_response: + modbus_response += bytes(CRC(modbus_response)) writer.write(modbus_response) + await t35_task + await self.bus.send(Message(self.name, {'event': 'sent', + 'message': modbus_response.hex()})) -- 2.34.1