Refactor CRC computation to class to repeat less.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 7 Apr 2021 22:02:23 +0000 (00:02 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 7 Apr 2021 22:02:23 +0000 (00:02 +0200)
controlpi_plugins/modbus.py

index f9b0f41ed04536fb273bcb147c9fcba945af9d0c..9719c1ec0a4e08f504b34a77b90c64ecd0e33079 100644 (file)
@@ -13,44 +13,90 @@ from typing import Dict
 
 
 class CRC:
-    """Calculate CRC for message.
-
-    The CRC as specified for Modbus is computed using a precomputed table
-    for its inner loop.
-
-    We define 12 test messages with their expected results from the manual
-    of the Hitachi PJ series Modbus devices:
-    >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')),
-    ...          (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')),
-    ...          (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')),
-    ...          (bytes.fromhex('05 03 06 0007 0000 1770'),
-    ...           bytes.fromhex('A861')),
-    ...          (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')),
-    ...          (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')),
-    ...          (bytes.fromhex('05 0F 0006 0006 02 1700'),
-    ...           bytes.fromhex('DB3E')),
-    ...          (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')),
-    ...          (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'),
-    ...           bytes.fromhex('F42B')),  # error in manual
-    ...          (bytes.fromhex('01 10 2B01 0002'),
-    ...           bytes.fromhex('19EC')),  # error in manual
-    ...          (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'),
-    ...           bytes.fromhex('964D')),  # error in manual
-    ...          (bytes.fromhex('01 17 04 0000 1388'),
-    ...           bytes.fromhex('F471'))]
-
-    Now, we test the crc function against this list:
-    >>> crc_zero = bytes.fromhex('0000')
-    >>> for (message, expected) in tests:
-    ...     crc = CRC(message)
-    ...     assert bytes(crc) == expected
-    ...     assert not bool(crc)
-    ...     for byte in bytes(crc):
-    ...         crc.update(byte)
-    ...     assert bool(crc)
+    r"""Calculate CRC for message.
+
+    The CRC of a message as specified for Modbus is computed using a
+    precomputed table for its inner loop.
+
+    The message can be given directly when constructing a CRC object:
+    >>> crc = CRC(b'\x08\x01\x01\x17')
+    >>> print(crc)
+    crc(08010117) = 0x1A12
+    >>> repr(crc)
+    "CRC(bytes.fromhex('08010117'))"
+    >>> assert eval(repr(crc)) == crc
+
+    The CRC can also be computed on the fly by using the update method with
+    single bytes:
+    >>> crc = CRC()
+    >>> for byte in b'\x08\x01\x01\x17':
+    ...     crc.update(byte)
+    >>> print(crc)
+    crc(08010117) = 0x1A12
+
+    CRC objects can be cast to integers to get the pure value:
+    >>> print(f"{int(crc)} = 0x{int(crc):04X}")
+    6674 = 0x1A12
+
+    They can also be cast to bytes, where the bytes are given in the
+    low-byte first (little-endian) order needed by Modbus:
+    >>> bytes(crc)
+    b'\x12\x1a'
+
+    The shortest way to add a CRC to a message is then:
+    >>> message = b'\x08\x01\x01\x17'
+    >>> message_with_crc = message + bytes(CRC(message))
+    >>> message_with_crc
+    b'\x08\x01\x01\x17\x12\x1a'
+
+    The CRC of a message with its CRC appended is always zero:
+    >>> print(CRC(message_with_crc))
+    crc(08010117121A) = 0x0000
+
+    Received messages can, hence, be checked by updating the CRC object on
+    the fly including the appended CRC and just checking for equality to
+    zero at the end.
+    For convenience, CRC objects can also simply be cast to boolean:
+    >>> crc = CRC()
+    >>> for byte in message_with_crc:
+    ...     crc.update(byte)
+    >>> if crc:
+    ...     print("Message okay")
+    Message okay
+
+    The 12 test messages from the manual of the Hitachi PJ series Modbus
+    devices have these results:
+    >>> print(CRC(bytes.fromhex('08 01 0006 0006')))
+    crc(080100060006) = 0x905C
+    >>> print(CRC(bytes.fromhex('08 01 01 17')))
+    crc(08010117) = 0x1A12
+    >>> print(CRC(bytes.fromhex('05 03 03E8 0003')))
+    crc(050303E80003) = 0x3F84
+    >>> print(CRC(bytes.fromhex('05 03 06 0007 0000 1770')))
+    crc(050306000700001770) = 0x61A8
+    >>> print(CRC(bytes.fromhex('0A 05 0000 FF00')))
+    crc(0A050000FF00) = 0x418D
+    >>> print(CRC(bytes.fromhex('01 06 2F4D 1388')))
+    crc(01062F4D1388) = 0x5F1C
+    >>> print(CRC(bytes.fromhex('05 0F 0006 0006 02 1700')))
+    crc(050F00060006021700) = 0x3EDB
+    >>> print(CRC(bytes.fromhex('05 0F 0006 0006')))
+    crc(050F00060006) = 0x4C34
+    >>> print(CRC(bytes.fromhex('01 10 2B01 0002 04 0004 93E0')))
+    ... # Error in manual: 0x9F9E
+    crc(01102B01000204000493E0) = 0x2BF4
+    >>> print(CRC(bytes.fromhex('01 10 2B01 0002')))
+    ... # Error in manual: 0x34E5
+    crc(01102B010002) = 0xEC19
+    >>> print(CRC(bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388')))
+    ... # Error in manual: 0x86F4
+    crc(0117271000022AF800020400001388) = 0x4D96
+    >>> print(CRC(bytes.fromhex('01 17 04 0000 1388')))
+    crc(01170400001388) = 0x71F4
     """
 
     def __init__(self, message: bytes = b'') -> None:
+        """Initialise CRC and optionally update with given message."""
         self._crc = 0xFFFF
         self._message = b''
         for byte in message:
@@ -82,88 +128,33 @@ class CRC:
             cls._precomputed[key] = value
 
     def __repr__(self) -> str:
-        return f"CRC(bytes.fromhex({self._message.hex().upper()}))"
+        """Represent CRC as evaluable string."""
+        return f"CRC(bytes.fromhex('{self._message.hex().upper()}'))"
+
+    def __eq__(self, other) -> bool:
+        """Check for equality to another CRC."""
+        if isinstance(other, CRC):
+            return self._message == other._message
+        return False
 
     def __str__(self) -> str:
+        """Represent as string showing message as well as CRC in hex."""
         return f"crc({self._message.hex().upper()}) = 0x{self._crc:04X}"
 
-    def __bool__(self) -> bool:
-        return self._crc == 0
-
     def __int__(self) -> int:
+        """Cast pure CRC to integer."""
         return self._crc
 
     def __bytes__(self) -> bytes:
+        """Cast to two bytes, low-byte first."""
         return bytes((self._crc & 0xFF, self._crc >> 8))
 
+    def __bool__(self) -> bool:
+        """Check for equality to zero (i.e., message with CRC correct)."""
+        return self._crc == 0
 
-CRC.precompute()
-
-
-def crc(message: bytes) -> bytes:
-    """Calculate CRC for message.
-
-    Calculate CRC for message using precomputed dictionary.
-
-    We define 12 test messages with their expected results from the manual
-    of the Hitachi PJ series Modbus devices:
-    >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')),
-    ...          (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')),
-    ...          (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')),
-    ...          (bytes.fromhex('05 03 06 0007 0000 1770'),
-    ...           bytes.fromhex('A861')),
-    ...          (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')),
-    ...          (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')),
-    ...          (bytes.fromhex('05 0F 0006 0006 02 1700'),
-    ...           bytes.fromhex('DB3E')),
-    ...          (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')),
-    ...          (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'),
-    ...           bytes.fromhex('F42B')),  # error in manual
-    ...          (bytes.fromhex('01 10 2B01 0002'),
-    ...           bytes.fromhex('19EC')),  # error in manual
-    ...          (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'),
-    ...           bytes.fromhex('964D')),  # error in manual
-    ...          (bytes.fromhex('01 17 04 0000 1388'),
-    ...           bytes.fromhex('F471'))]
-
-    Now, we test the crc function against this list:
-    >>> crc_zero = bytes.fromhex('0000')
-    >>> for (message, expected) in tests:
-    ...     result = crc(message)
-    ...     assert result == expected, (f"crc({message.hex()}) ="
-    ...                                 f" {result.hex()} !="
-    ...                                 f" {expected.hex()}")
-    ...     crc_message = message + expected
-    ...     crc_result = crc(crc_message)
-    ...     assert crc_result == crc_zero, (f"crc({crc_message.hex()}) = "
-    ...                                     f" {crc_result.hex()} !="
-    ...                                     f" {zero.hex()}")
-
-    Since the CRC of a whole message with its CRC appended is always zero,
-    we can check the CRC of incoming messages on the fly and just check for
-    it being zero at the end of transmission.
-    This is done in the plugin classes below and the crc function itself is
-    only used for constructing messages to be sent.
-    """
-    crc = 0xFFFF
-    for byte in message:
-        crc ^= byte
-        dict_key = crc & 0xFF
-        crc >>= 8
-        crc ^= _crc_dict[dict_key]
-    return bytes((crc & 0xFF, crc >> 8))
-
-
-_crc_dict: Dict[int, int] = {}
 
-for dict_key in range(256):
-    dict_value = dict_key
-    for bit in range(8):
-        lsb = dict_value & 1
-        dict_value >>= 1
-        if lsb:
-            dict_value ^= 0xA001
-    _crc_dict[dict_key] = dict_value
+CRC.precompute()
 
 
 class ModbusMaster(BasePlugin):
@@ -517,7 +508,7 @@ class ModbusMaster(BasePlugin):
                 value_hi = message['value'] >> 8
                 value_lo = message['value'] & 0xFF
                 modbus_message += bytes([value_hi, value_lo])
-        modbus_message += crc(modbus_message)
+        modbus_message += bytes(CRC(modbus_message))
         await self._queue.put(modbus_message)
 
     async def _process_response(self, modbus_message: bytes) -> None:
@@ -623,7 +614,7 @@ class ModbusMaster(BasePlugin):
                     await asyncio.sleep(self.conf['turnaround delay'])
                     break
                 modbus_response = b''
-                crc = 0xFFFF
+                crc = CRC()
                 t35_task: asyncio.Task = asyncio.create_task(
                     asyncio.sleep(self._t35))
                 try:
@@ -631,10 +622,7 @@ class ModbusMaster(BasePlugin):
                     first_byte = await asyncio.wait_for(
                         reader.read(1), self.conf['response timeout'])
                     modbus_response += first_byte
-                    crc ^= first_byte[0]
-                    dict_key = crc & 0xFF
-                    crc >>= 8
-                    crc ^= _crc_dict[dict_key]
+                    crc.update(first_byte[0])
                 except asyncio.TimeoutError:
                     message = Message(self.name)
                     message['event'] = 'error'
@@ -650,13 +638,10 @@ class ModbusMaster(BasePlugin):
                         next_byte = await asyncio.wait_for(
                                 reader.read(1), self._t15)
                         modbus_response += next_byte
-                        crc ^= next_byte[0]
-                        dict_key = crc & 0xFF
-                        crc >>= 8
-                        crc ^= _crc_dict[dict_key]
+                        crc.update(next_byte[0])
                     except asyncio.TimeoutError:
                         break
-                if crc != 0:
+                if not crc:
                     message = Message(self.name)
                     message['event'] = 'error'
                     message['description'] = (
@@ -829,7 +814,7 @@ class ModbusSlave(BasePlugin):
                     modbus_response = modbus_request[:-2]
         else:
             modbus_response += bytes([function_code | 0x80, 0x01])
-        modbus_response += crc(modbus_response)
+        modbus_response += bytes(CRC(modbus_response))
         message = Message(self.name)
         message['event'] = 'sent'
         message['message'] = modbus_response.hex()
@@ -843,16 +828,13 @@ class ModbusSlave(BasePlugin):
                 parity=self._parity, stopbits=self._stopbits)
         while True:
             modbus_request = b''
-            crc = 0xFFFF
+            crc = CRC()
             t35_task: asyncio.Task = asyncio.create_task(
                 asyncio.sleep(self._t35))
             # First byte is read without timeout:
             first_byte = await reader.read(1)
             modbus_request += first_byte
-            crc ^= first_byte[0]
-            dict_key = crc & 0xFF
-            crc >>= 8
-            crc ^= _crc_dict[dict_key]
+            crc.update(first_byte[0])
             while True:
                 t35_task = asyncio.create_task(asyncio.sleep(self._t35))
                 try:
@@ -861,13 +843,10 @@ class ModbusSlave(BasePlugin):
                     next_byte = await asyncio.wait_for(
                             reader.read(1), self._t15)
                     modbus_request += next_byte
-                    crc ^= next_byte[0]
-                    dict_key = crc & 0xFF
-                    crc >>= 8
-                    crc ^= _crc_dict[dict_key]
+                    crc.update(next_byte[0])
                 except asyncio.TimeoutError:
                     break
-            if crc != 0:
+            if not crc:
                 continue
             if modbus_request[0] != self.conf['slave']:
                 continue