From 579d3f23f3156c7cf0ada7d19174540608b5fdb0 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Thu, 8 Apr 2021 00:02:23 +0200 Subject: [PATCH] Refactor CRC computation to class to repeat less. --- controlpi_plugins/modbus.py | 233 ++++++++++++++++-------------------- 1 file changed, 106 insertions(+), 127 deletions(-) diff --git a/controlpi_plugins/modbus.py b/controlpi_plugins/modbus.py index f9b0f41..9719c1e 100644 --- a/controlpi_plugins/modbus.py +++ b/controlpi_plugins/modbus.py @@ -13,44 +13,90 @@ from typing import Dict class CRC: - """Calculate CRC for message. - - The CRC as specified for Modbus is computed using a precomputed table - for its inner loop. - - We define 12 test messages with their expected results from the manual - of the Hitachi PJ series Modbus devices: - >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')), - ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')), - ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')), - ... (bytes.fromhex('05 03 06 0007 0000 1770'), - ... bytes.fromhex('A861')), - ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')), - ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')), - ... (bytes.fromhex('05 0F 0006 0006 02 1700'), - ... bytes.fromhex('DB3E')), - ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')), - ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'), - ... bytes.fromhex('F42B')), # error in manual - ... (bytes.fromhex('01 10 2B01 0002'), - ... bytes.fromhex('19EC')), # error in manual - ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'), - ... bytes.fromhex('964D')), # error in manual - ... (bytes.fromhex('01 17 04 0000 1388'), - ... bytes.fromhex('F471'))] - - Now, we test the crc function against this list: - >>> crc_zero = bytes.fromhex('0000') - >>> for (message, expected) in tests: - ... crc = CRC(message) - ... assert bytes(crc) == expected - ... assert not bool(crc) - ... for byte in bytes(crc): - ... crc.update(byte) - ... assert bool(crc) + r"""Calculate CRC for message. + + The CRC of a message as specified for Modbus is computed using a + precomputed table for its inner loop. + + The message can be given directly when constructing a CRC object: + >>> crc = CRC(b'\x08\x01\x01\x17') + >>> print(crc) + crc(08010117) = 0x1A12 + >>> repr(crc) + "CRC(bytes.fromhex('08010117'))" + >>> assert eval(repr(crc)) == crc + + The CRC can also be computed on the fly by using the update method with + single bytes: + >>> crc = CRC() + >>> for byte in b'\x08\x01\x01\x17': + ... crc.update(byte) + >>> print(crc) + crc(08010117) = 0x1A12 + + CRC objects can be cast to integers to get the pure value: + >>> print(f"{int(crc)} = 0x{int(crc):04X}") + 6674 = 0x1A12 + + They can also be cast to bytes, where the bytes are given in the + low-byte first (little-endian) order needed by Modbus: + >>> bytes(crc) + b'\x12\x1a' + + The shortest way to add a CRC to a message is then: + >>> message = b'\x08\x01\x01\x17' + >>> message_with_crc = message + bytes(CRC(message)) + >>> message_with_crc + b'\x08\x01\x01\x17\x12\x1a' + + The CRC of a message with its CRC appended is always zero: + >>> print(CRC(message_with_crc)) + crc(08010117121A) = 0x0000 + + Received messages can, hence, be checked by updating the CRC object on + the fly including the appended CRC and just checking for equality to + zero at the end. + For convenience, CRC objects can also simply be cast to boolean: + >>> crc = CRC() + >>> for byte in message_with_crc: + ... crc.update(byte) + >>> if crc: + ... print("Message okay") + Message okay + + The 12 test messages from the manual of the Hitachi PJ series Modbus + devices have these results: + >>> print(CRC(bytes.fromhex('08 01 0006 0006'))) + crc(080100060006) = 0x905C + >>> print(CRC(bytes.fromhex('08 01 01 17'))) + crc(08010117) = 0x1A12 + >>> print(CRC(bytes.fromhex('05 03 03E8 0003'))) + crc(050303E80003) = 0x3F84 + >>> print(CRC(bytes.fromhex('05 03 06 0007 0000 1770'))) + crc(050306000700001770) = 0x61A8 + >>> print(CRC(bytes.fromhex('0A 05 0000 FF00'))) + crc(0A050000FF00) = 0x418D + >>> print(CRC(bytes.fromhex('01 06 2F4D 1388'))) + crc(01062F4D1388) = 0x5F1C + >>> print(CRC(bytes.fromhex('05 0F 0006 0006 02 1700'))) + crc(050F00060006021700) = 0x3EDB + >>> print(CRC(bytes.fromhex('05 0F 0006 0006'))) + crc(050F00060006) = 0x4C34 + >>> print(CRC(bytes.fromhex('01 10 2B01 0002 04 0004 93E0'))) + ... # Error in manual: 0x9F9E + crc(01102B01000204000493E0) = 0x2BF4 + >>> print(CRC(bytes.fromhex('01 10 2B01 0002'))) + ... # Error in manual: 0x34E5 + crc(01102B010002) = 0xEC19 + >>> print(CRC(bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'))) + ... # Error in manual: 0x86F4 + crc(0117271000022AF800020400001388) = 0x4D96 + >>> print(CRC(bytes.fromhex('01 17 04 0000 1388'))) + crc(01170400001388) = 0x71F4 """ def __init__(self, message: bytes = b'') -> None: + """Initialise CRC and optionally update with given message.""" self._crc = 0xFFFF self._message = b'' for byte in message: @@ -82,88 +128,33 @@ class CRC: cls._precomputed[key] = value def __repr__(self) -> str: - return f"CRC(bytes.fromhex({self._message.hex().upper()}))" + """Represent CRC as evaluable string.""" + return f"CRC(bytes.fromhex('{self._message.hex().upper()}'))" + + def __eq__(self, other) -> bool: + """Check for equality to another CRC.""" + if isinstance(other, CRC): + return self._message == other._message + return False def __str__(self) -> str: + """Represent as string showing message as well as CRC in hex.""" return f"crc({self._message.hex().upper()}) = 0x{self._crc:04X}" - def __bool__(self) -> bool: - return self._crc == 0 - def __int__(self) -> int: + """Cast pure CRC to integer.""" return self._crc def __bytes__(self) -> bytes: + """Cast to two bytes, low-byte first.""" return bytes((self._crc & 0xFF, self._crc >> 8)) + def __bool__(self) -> bool: + """Check for equality to zero (i.e., message with CRC correct).""" + return self._crc == 0 -CRC.precompute() - - -def crc(message: bytes) -> bytes: - """Calculate CRC for message. - - Calculate CRC for message using precomputed dictionary. - - We define 12 test messages with their expected results from the manual - of the Hitachi PJ series Modbus devices: - >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')), - ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')), - ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')), - ... (bytes.fromhex('05 03 06 0007 0000 1770'), - ... bytes.fromhex('A861')), - ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')), - ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')), - ... (bytes.fromhex('05 0F 0006 0006 02 1700'), - ... bytes.fromhex('DB3E')), - ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')), - ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'), - ... bytes.fromhex('F42B')), # error in manual - ... (bytes.fromhex('01 10 2B01 0002'), - ... bytes.fromhex('19EC')), # error in manual - ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'), - ... bytes.fromhex('964D')), # error in manual - ... (bytes.fromhex('01 17 04 0000 1388'), - ... bytes.fromhex('F471'))] - - Now, we test the crc function against this list: - >>> crc_zero = bytes.fromhex('0000') - >>> for (message, expected) in tests: - ... result = crc(message) - ... assert result == expected, (f"crc({message.hex()}) =" - ... f" {result.hex()} !=" - ... f" {expected.hex()}") - ... crc_message = message + expected - ... crc_result = crc(crc_message) - ... assert crc_result == crc_zero, (f"crc({crc_message.hex()}) = " - ... f" {crc_result.hex()} !=" - ... f" {zero.hex()}") - - Since the CRC of a whole message with its CRC appended is always zero, - we can check the CRC of incoming messages on the fly and just check for - it being zero at the end of transmission. - This is done in the plugin classes below and the crc function itself is - only used for constructing messages to be sent. - """ - crc = 0xFFFF - for byte in message: - crc ^= byte - dict_key = crc & 0xFF - crc >>= 8 - crc ^= _crc_dict[dict_key] - return bytes((crc & 0xFF, crc >> 8)) - - -_crc_dict: Dict[int, int] = {} -for dict_key in range(256): - dict_value = dict_key - for bit in range(8): - lsb = dict_value & 1 - dict_value >>= 1 - if lsb: - dict_value ^= 0xA001 - _crc_dict[dict_key] = dict_value +CRC.precompute() class ModbusMaster(BasePlugin): @@ -517,7 +508,7 @@ class ModbusMaster(BasePlugin): value_hi = message['value'] >> 8 value_lo = message['value'] & 0xFF modbus_message += bytes([value_hi, value_lo]) - modbus_message += crc(modbus_message) + modbus_message += bytes(CRC(modbus_message)) await self._queue.put(modbus_message) async def _process_response(self, modbus_message: bytes) -> None: @@ -623,7 +614,7 @@ class ModbusMaster(BasePlugin): await asyncio.sleep(self.conf['turnaround delay']) break modbus_response = b'' - crc = 0xFFFF + crc = CRC() t35_task: asyncio.Task = asyncio.create_task( asyncio.sleep(self._t35)) try: @@ -631,10 +622,7 @@ class ModbusMaster(BasePlugin): first_byte = await asyncio.wait_for( reader.read(1), self.conf['response timeout']) modbus_response += first_byte - crc ^= first_byte[0] - dict_key = crc & 0xFF - crc >>= 8 - crc ^= _crc_dict[dict_key] + crc.update(first_byte[0]) except asyncio.TimeoutError: message = Message(self.name) message['event'] = 'error' @@ -650,13 +638,10 @@ class ModbusMaster(BasePlugin): next_byte = await asyncio.wait_for( reader.read(1), self._t15) modbus_response += next_byte - crc ^= next_byte[0] - dict_key = crc & 0xFF - crc >>= 8 - crc ^= _crc_dict[dict_key] + crc.update(next_byte[0]) except asyncio.TimeoutError: break - if crc != 0: + if not crc: message = Message(self.name) message['event'] = 'error' message['description'] = ( @@ -829,7 +814,7 @@ class ModbusSlave(BasePlugin): modbus_response = modbus_request[:-2] else: modbus_response += bytes([function_code | 0x80, 0x01]) - modbus_response += crc(modbus_response) + modbus_response += bytes(CRC(modbus_response)) message = Message(self.name) message['event'] = 'sent' message['message'] = modbus_response.hex() @@ -843,16 +828,13 @@ class ModbusSlave(BasePlugin): parity=self._parity, stopbits=self._stopbits) while True: modbus_request = b'' - crc = 0xFFFF + crc = CRC() t35_task: asyncio.Task = asyncio.create_task( asyncio.sleep(self._t35)) # First byte is read without timeout: first_byte = await reader.read(1) modbus_request += first_byte - crc ^= first_byte[0] - dict_key = crc & 0xFF - crc >>= 8 - crc ^= _crc_dict[dict_key] + crc.update(first_byte[0]) while True: t35_task = asyncio.create_task(asyncio.sleep(self._t35)) try: @@ -861,13 +843,10 @@ class ModbusSlave(BasePlugin): next_byte = await asyncio.wait_for( reader.read(1), self._t15) modbus_request += next_byte - crc ^= next_byte[0] - dict_key = crc & 0xFF - crc >>= 8 - crc ^= _crc_dict[dict_key] + crc.update(next_byte[0]) except asyncio.TimeoutError: break - if crc != 0: + if not crc: continue if modbus_request[0] != self.conf['slave']: continue -- 2.34.1