From a92466d2960407359a671da41c09b37244d9e76d Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 14 Apr 2021 01:27:45 +0200 Subject: [PATCH] Base Modbus communication on expected lengths. --- controlpi_plugins/modbus.py | 160 +++++++++++++++++++----------------- 1 file changed, 83 insertions(+), 77 deletions(-) diff --git a/controlpi_plugins/modbus.py b/controlpi_plugins/modbus.py index 75b9482..faaa3d9 100644 --- a/controlpi_plugins/modbus.py +++ b/controlpi_plugins/modbus.py @@ -399,18 +399,6 @@ class ModbusMaster(BasePlugin): self._stopbits = serial.STOPBITS_ONE if self.conf['stopbits'] == 2: self._stopbits = serial.STOPBITS_TWO - # 1.5 char and 3.5 char times for protocol timing - # (according to Modbus specification): - self._t15 = 0.001000 - self._t35 = 0.001750 - if self.conf['baudrate'] <= 19200: - bit_per_char = 9 - if self.conf['parity'] != 'none': - bit_per_char += 1 - bit_per_char += self.conf['stopbits'] - seconds_per_char = bit_per_char / self.conf['baudrate'] - self._t15 = 2.0 * seconds_per_char - self._t35 = 3.5 * seconds_per_char # Queue for Modbus messages to be sent: self._queue: asyncio.Queue = asyncio.Queue() # Message templates sent and received by plugin client: @@ -606,59 +594,85 @@ class ModbusMaster(BasePlugin): url=self.conf['device'], baudrate=self.conf['baudrate'], parity=self._parity, stopbits=self._stopbits) while True: - modbus_request = await self._queue.get() - slave = modbus_request[0] + request = await self._queue.get() + slave = request[0] + function_code = request[1] + # Read bytes from serial (normally there shouldn't be any): + unexpected_message = b'' + while True: + try: + unexpected_message += await asyncio.wait_for( + reader.read(1), 0.001) + except asyncio.TimeoutError: + break + if unexpected_message: + message = Message(self.name) + message['event'] = 'error' + message['message'] = unexpected_message.hex() + message['description'] = "Unexpected message on Modbus." + await self.bus.send(message) + # Retry self.conf['retries'] times: tries = 0 while tries <= self.conf['retries']: tries += 1 - writer.write(modbus_request) + writer.write(request) if slave == 0: # Broadcast => just wait for delay and finish: await asyncio.sleep(self.conf['turnaround delay']) break - modbus_response = b'' + # Read response: + response = b'' crc = CRC() - t35_task: asyncio.Task = asyncio.create_task( - asyncio.sleep(self._t35)) - try: - first_byte = await asyncio.wait_for( - reader.read(1), self.conf['response timeout']) - modbus_response += first_byte - crc.update(first_byte[0]) - except asyncio.TimeoutError: - message = Message(self.name) - message['event'] = 'error' - message['description'] = ( - f"Response time out on try {tries}.") - await self.bus.send(message) - continue + expected_length = 5 + length = 0 while True: - t35_task = asyncio.create_task(asyncio.sleep(self._t35)) try: - next_byte = await asyncio.wait_for( - reader.read(1), self._t15) - modbus_response += next_byte - crc.update(next_byte[0]) + response += await asyncio.wait_for( + reader.read(1), self.conf['response timeout']) + crc.update(response[length]) except asyncio.TimeoutError: break + length += 1 + # Update expected length based on function: + if (length == 2 and + (response[1] == 0x05 or response[1] == 0x06 or + response[1] == 0x08 or response[1] == 0x0F or + response[1] == 0x10)): + expected_length = 8 + if (length == 3 and + (response[1] == 0x01 or response[1] == 0x03 or + response[1] == 0x17)): + expected_length = response[2] + 5 + # Break at expected length: + if length == expected_length: + break + # Process possible errors leading to retry: + message = Message(self.name) + message['event'] = 'error' + message['message'] = response.hex() + if length < expected_length: + message['description'] = ( + f"Response timed out on try {tries}.") + await self.bus.send(message) + continue if not crc: - message = Message(self.name) - message['event'] = 'error' message['description'] = ( f"Response CRC failed on try {tries}.") - message['message'] = modbus_response.hex() await self.bus.send(message) continue - if modbus_response[0] != slave: - message = Message(self.name) - message['event'] = 'error' + if response[0] != slave: message['description'] = ( f"Response from wrong slave on try {tries}.") - message['message'] = modbus_response.hex() await self.bus.send(message) continue - await self._process_response(modbus_response) - await t35_task + if response[1] & 0x7F != function_code: + message['description'] = ( + f"Response for wrong function on try {tries}.") + await self.bus.send(message) + continue + # Process response from correct slave for correct funtion + # with valid CRC: + await self._process_response(response) break self._queue.task_done() @@ -708,18 +722,6 @@ class ModbusSlave(BasePlugin): self._stopbits = serial.STOPBITS_ONE if 'stopbits' in self.conf and self.conf['stopbits'] == 2: self._stopbits = serial.STOPBITS_TWO - # 1.5 char and 3.5 char times for protocol timing - # (according to Modbus specification): - self._t15 = 0.001000 - self._t35 = 0.001750 - if self._baudrate <= 19200: - bit_per_char = 11 - if (self._parity == serial.PARITY_NONE and - self._stopbits == serial.STOPBITS_ONE): - bit_per_char = 10 - seconds_per_char = bit_per_char / self._baudrate - self._t15 = 2.0 * seconds_per_char - self._t35 = 3.5 * seconds_per_char # Coils and registers: self._coils: Dict[int, bool] = {} self._registers: Dict[int, int] = {} @@ -790,32 +792,36 @@ class ModbusSlave(BasePlugin): url=self._device, baudrate=self._baudrate, parity=self._parity, stopbits=self._stopbits) while True: - modbus_request = b'' + # Read request: + request = b'' crc = CRC() - first_byte = await reader.read(1) - t35_task: asyncio.Task = asyncio.create_task( - asyncio.sleep(self._t35)) - modbus_request += first_byte - crc.update(first_byte[0]) + expected_length = 8 + length = 0 while True: - try: - next_byte = await asyncio.wait_for( - reader.read(1), self._t15) - t35_task = asyncio.create_task(asyncio.sleep(self._t35)) - modbus_request += next_byte - crc.update(next_byte[0]) - except asyncio.TimeoutError: + request += await reader.read(1) + crc.update(request[length]) + length += 1 + # Update expected length based on function: + if length == 2: + if request[1] == 0x0F or request[1] == 0x10: + expected_length = 9 + if request[1] == 0x17: + expected_length = 13 + if length == 7 and (request[1] == 0x0F or request[1] == 0x10): + expected_length += request[6] + if length == 11 and request[1] == 0x17: + expected_length += request[10] + if length == expected_length: break if not crc: await self.bus.send(Message(self.name, {'event': 'crc error', - 'message': modbus_request.hex()})) + 'message': request.hex()})) continue await self.bus.send(Message(self.name, {'event': 'received', - 'message': modbus_request.hex()})) - modbus_response = await self._process_request(modbus_request) - if modbus_response: - modbus_response += bytes(CRC(modbus_response)) - writer.write(modbus_response) - await t35_task + 'message': request.hex()})) + response = await self._process_request(request) + if response: + response += bytes(CRC(response)) + writer.write(response) await self.bus.send(Message(self.name, {'event': 'sent', - 'message': modbus_response.hex()})) + 'message': response.hex()})) -- 2.34.1