class CRC:
- """Calculate CRC for message.
-
- The CRC as specified for Modbus is computed using a precomputed table
- for its inner loop.
-
- We define 12 test messages with their expected results from the manual
- of the Hitachi PJ series Modbus devices:
- >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')),
- ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')),
- ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')),
- ... (bytes.fromhex('05 03 06 0007 0000 1770'),
- ... bytes.fromhex('A861')),
- ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')),
- ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')),
- ... (bytes.fromhex('05 0F 0006 0006 02 1700'),
- ... bytes.fromhex('DB3E')),
- ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')),
- ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'),
- ... bytes.fromhex('F42B')), # error in manual
- ... (bytes.fromhex('01 10 2B01 0002'),
- ... bytes.fromhex('19EC')), # error in manual
- ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'),
- ... bytes.fromhex('964D')), # error in manual
- ... (bytes.fromhex('01 17 04 0000 1388'),
- ... bytes.fromhex('F471'))]
-
- Now, we test the crc function against this list:
- >>> crc_zero = bytes.fromhex('0000')
- >>> for (message, expected) in tests:
- ... crc = CRC(message)
- ... assert bytes(crc) == expected
- ... assert not bool(crc)
- ... for byte in bytes(crc):
- ... crc.update(byte)
- ... assert bool(crc)
+ r"""Calculate CRC for message.
+
+ The CRC of a message as specified for Modbus is computed using a
+ precomputed table for its inner loop.
+
+ The message can be given directly when constructing a CRC object:
+ >>> crc = CRC(b'\x08\x01\x01\x17')
+ >>> print(crc)
+ crc(08010117) = 0x1A12
+ >>> repr(crc)
+ "CRC(bytes.fromhex('08010117'))"
+ >>> assert eval(repr(crc)) == crc
+
+ The CRC can also be computed on the fly by using the update method with
+ single bytes:
+ >>> crc = CRC()
+ >>> for byte in b'\x08\x01\x01\x17':
+ ... crc.update(byte)
+ >>> print(crc)
+ crc(08010117) = 0x1A12
+
+ CRC objects can be cast to integers to get the pure value:
+ >>> print(f"{int(crc)} = 0x{int(crc):04X}")
+ 6674 = 0x1A12
+
+ They can also be cast to bytes, where the bytes are given in the
+ low-byte first (little-endian) order needed by Modbus:
+ >>> bytes(crc)
+ b'\x12\x1a'
+
+ The shortest way to add a CRC to a message is then:
+ >>> message = b'\x08\x01\x01\x17'
+ >>> message_with_crc = message + bytes(CRC(message))
+ >>> message_with_crc
+ b'\x08\x01\x01\x17\x12\x1a'
+
+ The CRC of a message with its CRC appended is always zero:
+ >>> print(CRC(message_with_crc))
+ crc(08010117121A) = 0x0000
+
+ Received messages can, hence, be checked by updating the CRC object on
+ the fly including the appended CRC and just checking for equality to
+ zero at the end.
+ For convenience, CRC objects can also simply be cast to boolean:
+ >>> crc = CRC()
+ >>> for byte in message_with_crc:
+ ... crc.update(byte)
+ >>> if crc:
+ ... print("Message okay")
+ Message okay
+
+ The 12 test messages from the manual of the Hitachi PJ series Modbus
+ devices have these results:
+ >>> print(CRC(bytes.fromhex('08 01 0006 0006')))
+ crc(080100060006) = 0x905C
+ >>> print(CRC(bytes.fromhex('08 01 01 17')))
+ crc(08010117) = 0x1A12
+ >>> print(CRC(bytes.fromhex('05 03 03E8 0003')))
+ crc(050303E80003) = 0x3F84
+ >>> print(CRC(bytes.fromhex('05 03 06 0007 0000 1770')))
+ crc(050306000700001770) = 0x61A8
+ >>> print(CRC(bytes.fromhex('0A 05 0000 FF00')))
+ crc(0A050000FF00) = 0x418D
+ >>> print(CRC(bytes.fromhex('01 06 2F4D 1388')))
+ crc(01062F4D1388) = 0x5F1C
+ >>> print(CRC(bytes.fromhex('05 0F 0006 0006 02 1700')))
+ crc(050F00060006021700) = 0x3EDB
+ >>> print(CRC(bytes.fromhex('05 0F 0006 0006')))
+ crc(050F00060006) = 0x4C34
+ >>> print(CRC(bytes.fromhex('01 10 2B01 0002 04 0004 93E0')))
+ ... # Error in manual: 0x9F9E
+ crc(01102B01000204000493E0) = 0x2BF4
+ >>> print(CRC(bytes.fromhex('01 10 2B01 0002')))
+ ... # Error in manual: 0x34E5
+ crc(01102B010002) = 0xEC19
+ >>> print(CRC(bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388')))
+ ... # Error in manual: 0x86F4
+ crc(0117271000022AF800020400001388) = 0x4D96
+ >>> print(CRC(bytes.fromhex('01 17 04 0000 1388')))
+ crc(01170400001388) = 0x71F4
"""
def __init__(self, message: bytes = b'') -> None:
+ """Initialise CRC and optionally update with given message."""
self._crc = 0xFFFF
self._message = b''
for byte in message:
cls._precomputed[key] = value
def __repr__(self) -> str:
- return f"CRC(bytes.fromhex({self._message.hex().upper()}))"
+ """Represent CRC as evaluable string."""
+ return f"CRC(bytes.fromhex('{self._message.hex().upper()}'))"
+
+ def __eq__(self, other) -> bool:
+ """Check for equality to another CRC."""
+ if isinstance(other, CRC):
+ return self._message == other._message
+ return False
def __str__(self) -> str:
+ """Represent as string showing message as well as CRC in hex."""
return f"crc({self._message.hex().upper()}) = 0x{self._crc:04X}"
- def __bool__(self) -> bool:
- return self._crc == 0
-
def __int__(self) -> int:
+ """Cast pure CRC to integer."""
return self._crc
def __bytes__(self) -> bytes:
+ """Cast to two bytes, low-byte first."""
return bytes((self._crc & 0xFF, self._crc >> 8))
+ def __bool__(self) -> bool:
+ """Check for equality to zero (i.e., message with CRC correct)."""
+ return self._crc == 0
-CRC.precompute()
-
-
-def crc(message: bytes) -> bytes:
- """Calculate CRC for message.
-
- Calculate CRC for message using precomputed dictionary.
-
- We define 12 test messages with their expected results from the manual
- of the Hitachi PJ series Modbus devices:
- >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')),
- ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')),
- ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')),
- ... (bytes.fromhex('05 03 06 0007 0000 1770'),
- ... bytes.fromhex('A861')),
- ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')),
- ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')),
- ... (bytes.fromhex('05 0F 0006 0006 02 1700'),
- ... bytes.fromhex('DB3E')),
- ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')),
- ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'),
- ... bytes.fromhex('F42B')), # error in manual
- ... (bytes.fromhex('01 10 2B01 0002'),
- ... bytes.fromhex('19EC')), # error in manual
- ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'),
- ... bytes.fromhex('964D')), # error in manual
- ... (bytes.fromhex('01 17 04 0000 1388'),
- ... bytes.fromhex('F471'))]
-
- Now, we test the crc function against this list:
- >>> crc_zero = bytes.fromhex('0000')
- >>> for (message, expected) in tests:
- ... result = crc(message)
- ... assert result == expected, (f"crc({message.hex()}) ="
- ... f" {result.hex()} !="
- ... f" {expected.hex()}")
- ... crc_message = message + expected
- ... crc_result = crc(crc_message)
- ... assert crc_result == crc_zero, (f"crc({crc_message.hex()}) = "
- ... f" {crc_result.hex()} !="
- ... f" {zero.hex()}")
-
- Since the CRC of a whole message with its CRC appended is always zero,
- we can check the CRC of incoming messages on the fly and just check for
- it being zero at the end of transmission.
- This is done in the plugin classes below and the crc function itself is
- only used for constructing messages to be sent.
- """
- crc = 0xFFFF
- for byte in message:
- crc ^= byte
- dict_key = crc & 0xFF
- crc >>= 8
- crc ^= _crc_dict[dict_key]
- return bytes((crc & 0xFF, crc >> 8))
-
-
-_crc_dict: Dict[int, int] = {}
-for dict_key in range(256):
- dict_value = dict_key
- for bit in range(8):
- lsb = dict_value & 1
- dict_value >>= 1
- if lsb:
- dict_value ^= 0xA001
- _crc_dict[dict_key] = dict_value
+CRC.precompute()
class ModbusMaster(BasePlugin):
value_hi = message['value'] >> 8
value_lo = message['value'] & 0xFF
modbus_message += bytes([value_hi, value_lo])
- modbus_message += crc(modbus_message)
+ modbus_message += bytes(CRC(modbus_message))
await self._queue.put(modbus_message)
async def _process_response(self, modbus_message: bytes) -> None:
await asyncio.sleep(self.conf['turnaround delay'])
break
modbus_response = b''
- crc = 0xFFFF
+ crc = CRC()
t35_task: asyncio.Task = asyncio.create_task(
asyncio.sleep(self._t35))
try:
first_byte = await asyncio.wait_for(
reader.read(1), self.conf['response timeout'])
modbus_response += first_byte
- crc ^= first_byte[0]
- dict_key = crc & 0xFF
- crc >>= 8
- crc ^= _crc_dict[dict_key]
+ crc.update(first_byte[0])
except asyncio.TimeoutError:
message = Message(self.name)
message['event'] = 'error'
next_byte = await asyncio.wait_for(
reader.read(1), self._t15)
modbus_response += next_byte
- crc ^= next_byte[0]
- dict_key = crc & 0xFF
- crc >>= 8
- crc ^= _crc_dict[dict_key]
+ crc.update(next_byte[0])
except asyncio.TimeoutError:
break
- if crc != 0:
+ if not crc:
message = Message(self.name)
message['event'] = 'error'
message['description'] = (
modbus_response = modbus_request[:-2]
else:
modbus_response += bytes([function_code | 0x80, 0x01])
- modbus_response += crc(modbus_response)
+ modbus_response += bytes(CRC(modbus_response))
message = Message(self.name)
message['event'] = 'sent'
message['message'] = modbus_response.hex()
parity=self._parity, stopbits=self._stopbits)
while True:
modbus_request = b''
- crc = 0xFFFF
+ crc = CRC()
t35_task: asyncio.Task = asyncio.create_task(
asyncio.sleep(self._t35))
# First byte is read without timeout:
first_byte = await reader.read(1)
modbus_request += first_byte
- crc ^= first_byte[0]
- dict_key = crc & 0xFF
- crc >>= 8
- crc ^= _crc_dict[dict_key]
+ crc.update(first_byte[0])
while True:
t35_task = asyncio.create_task(asyncio.sleep(self._t35))
try:
next_byte = await asyncio.wait_for(
reader.read(1), self._t15)
modbus_request += next_byte
- crc ^= next_byte[0]
- dict_key = crc & 0xFF
- crc >>= 8
- crc ^= _crc_dict[dict_key]
+ crc.update(next_byte[0])
except asyncio.TimeoutError:
break
- if crc != 0:
+ if not crc:
continue
if modbus_request[0] != self.conf['slave']:
continue