Base Modbus communication on expected lengths.
authorBenjamin Braatz <bb@bbraatz.eu>
Tue, 13 Apr 2021 23:27:45 +0000 (01:27 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Tue, 13 Apr 2021 23:27:45 +0000 (01:27 +0200)
controlpi_plugins/modbus.py

index 75b9482504370b25895d2216fb2c6857ff68d444..faaa3d9c17e094ed798687750bf817f8e1263c53 100644 (file)
@@ -399,18 +399,6 @@ class ModbusMaster(BasePlugin):
         self._stopbits = serial.STOPBITS_ONE
         if self.conf['stopbits'] == 2:
             self._stopbits = serial.STOPBITS_TWO
-        # 1.5 char and 3.5 char times for protocol timing
-        # (according to Modbus specification):
-        self._t15 = 0.001000
-        self._t35 = 0.001750
-        if self.conf['baudrate'] <= 19200:
-            bit_per_char = 9
-            if self.conf['parity'] != 'none':
-                bit_per_char += 1
-            bit_per_char += self.conf['stopbits']
-            seconds_per_char = bit_per_char / self.conf['baudrate']
-            self._t15 = 2.0 * seconds_per_char
-            self._t35 = 3.5 * seconds_per_char
         # Queue for Modbus messages to be sent:
         self._queue: asyncio.Queue = asyncio.Queue()
         # Message templates sent and received by plugin client:
@@ -606,59 +594,85 @@ class ModbusMaster(BasePlugin):
                 url=self.conf['device'], baudrate=self.conf['baudrate'],
                 parity=self._parity, stopbits=self._stopbits)
         while True:
-            modbus_request = await self._queue.get()
-            slave = modbus_request[0]
+            request = await self._queue.get()
+            slave = request[0]
+            function_code = request[1]
+            # Read bytes from serial (normally there shouldn't be any):
+            unexpected_message = b''
+            while True:
+                try:
+                    unexpected_message += await asyncio.wait_for(
+                        reader.read(1), 0.001)
+                except asyncio.TimeoutError:
+                    break
+            if unexpected_message:
+                message = Message(self.name)
+                message['event'] = 'error'
+                message['message'] = unexpected_message.hex()
+                message['description'] = "Unexpected message on Modbus."
+                await self.bus.send(message)
+            # Retry self.conf['retries'] times:
             tries = 0
             while tries <= self.conf['retries']:
                 tries += 1
-                writer.write(modbus_request)
+                writer.write(request)
                 if slave == 0:
                     # Broadcast => just wait for delay and finish:
                     await asyncio.sleep(self.conf['turnaround delay'])
                     break
-                modbus_response = b''
+                # Read response:
+                response = b''
                 crc = CRC()
-                t35_task: asyncio.Task = asyncio.create_task(
-                    asyncio.sleep(self._t35))
-                try:
-                    first_byte = await asyncio.wait_for(
-                        reader.read(1), self.conf['response timeout'])
-                    modbus_response += first_byte
-                    crc.update(first_byte[0])
-                except asyncio.TimeoutError:
-                    message = Message(self.name)
-                    message['event'] = 'error'
-                    message['description'] = (
-                            f"Response time out on try {tries}.")
-                    await self.bus.send(message)
-                    continue
+                expected_length = 5
+                length = 0
                 while True:
-                    t35_task = asyncio.create_task(asyncio.sleep(self._t35))
                     try:
-                        next_byte = await asyncio.wait_for(
-                                reader.read(1), self._t15)
-                        modbus_response += next_byte
-                        crc.update(next_byte[0])
+                        response += await asyncio.wait_for(
+                            reader.read(1), self.conf['response timeout'])
+                        crc.update(response[length])
                     except asyncio.TimeoutError:
                         break
+                    length += 1
+                    # Update expected length based on function:
+                    if (length == 2 and
+                            (response[1] == 0x05 or response[1] == 0x06 or
+                             response[1] == 0x08 or response[1] == 0x0F or
+                             response[1] == 0x10)):
+                        expected_length = 8
+                    if (length == 3 and
+                            (response[1] == 0x01 or response[1] == 0x03 or
+                             response[1] == 0x17)):
+                        expected_length = response[2] + 5
+                    # Break at expected length:
+                    if length == expected_length:
+                        break
+                # Process possible errors leading to retry:
+                message = Message(self.name)
+                message['event'] = 'error'
+                message['message'] = response.hex()
+                if length < expected_length:
+                    message['description'] = (
+                            f"Response timed out on try {tries}.")
+                    await self.bus.send(message)
+                    continue
                 if not crc:
-                    message = Message(self.name)
-                    message['event'] = 'error'
                     message['description'] = (
                             f"Response CRC failed on try {tries}.")
-                    message['message'] = modbus_response.hex()
                     await self.bus.send(message)
                     continue
-                if modbus_response[0] != slave:
-                    message = Message(self.name)
-                    message['event'] = 'error'
+                if response[0] != slave:
                     message['description'] = (
                             f"Response from wrong slave on try {tries}.")
-                    message['message'] = modbus_response.hex()
                     await self.bus.send(message)
                     continue
-                await self._process_response(modbus_response)
-                await t35_task
+                if response[1] & 0x7F != function_code:
+                    message['description'] = (
+                            f"Response for wrong function on try {tries}.")
+                    await self.bus.send(message)
+                    continue
+                # Process response from correct slave for correct funtion
+                # with valid CRC:
+                await self._process_response(response)
                 break
             self._queue.task_done()
 
@@ -708,18 +722,6 @@ class ModbusSlave(BasePlugin):
         self._stopbits = serial.STOPBITS_ONE
         if 'stopbits' in self.conf and self.conf['stopbits'] == 2:
             self._stopbits = serial.STOPBITS_TWO
-        # 1.5 char and 3.5 char times for protocol timing
-        # (according to Modbus specification):
-        self._t15 = 0.001000
-        self._t35 = 0.001750
-        if self._baudrate <= 19200:
-            bit_per_char = 11
-            if (self._parity == serial.PARITY_NONE and
-                    self._stopbits == serial.STOPBITS_ONE):
-                bit_per_char = 10
-            seconds_per_char = bit_per_char / self._baudrate
-            self._t15 = 2.0 * seconds_per_char
-            self._t35 = 3.5 * seconds_per_char
         # Coils and registers:
         self._coils: Dict[int, bool] = {}
         self._registers: Dict[int, int] = {}
@@ -790,32 +792,36 @@ class ModbusSlave(BasePlugin):
                 url=self._device, baudrate=self._baudrate,
                 parity=self._parity, stopbits=self._stopbits)
         while True:
-            modbus_request = b''
+            # Read request:
+            request = b''
             crc = CRC()
-            first_byte = await reader.read(1)
-            t35_task: asyncio.Task = asyncio.create_task(
-                asyncio.sleep(self._t35))
-            modbus_request += first_byte
-            crc.update(first_byte[0])
+            expected_length = 8
+            length = 0
             while True:
-                try:
-                    next_byte = await asyncio.wait_for(
-                            reader.read(1), self._t15)
-                    t35_task = asyncio.create_task(asyncio.sleep(self._t35))
-                    modbus_request += next_byte
-                    crc.update(next_byte[0])
-                except asyncio.TimeoutError:
+                request += await reader.read(1)
+                crc.update(request[length])
+                length += 1
+                # Update expected length based on function:
+                if length == 2:
+                    if request[1] == 0x0F or request[1] == 0x10:
+                        expected_length = 9
+                    if request[1] == 0x17:
+                        expected_length = 13
+                if length == 7 and (request[1] == 0x0F or request[1] == 0x10):
+                    expected_length += request[6]
+                if length == 11 and request[1] == 0x17:
+                    expected_length += request[10]
+                if length == expected_length:
                     break
             if not crc:
                 await self.bus.send(Message(self.name, {'event': 'crc error',
-                                    'message': modbus_request.hex()}))
+                                    'message': request.hex()}))
                 continue
             await self.bus.send(Message(self.name, {'event': 'received',
-                                'message': modbus_request.hex()}))
-            modbus_response = await self._process_request(modbus_request)
-            if modbus_response:
-                modbus_response += bytes(CRC(modbus_response))
-                writer.write(modbus_response)
-                await t35_task
+                                'message': request.hex()}))
+            response = await self._process_request(request)
+            if response:
+                response += bytes(CRC(response))
+                writer.write(response)
                 await self.bus.send(Message(self.name, {'event': 'sent',
-                                    'message': modbus_response.hex()}))
+                                    'message': response.hex()}))