TODO: documentation, doctests
"""
+import asyncio
+import serial # type: ignore
+import serial_asyncio # type: ignore
from controlpi import BasePlugin, Message, MessageTemplate
-from typing import List
-
-
-def crc_slow(message: bytes) -> bytes:
- """Compute CRC for message.
-
- A 16 bit CRC as specified in the Modbus specification is computed for
- the given message and this CRC is returned as two bytes (low byte
- first).
-
- (This is the "slow" version without precomputed table.)
-
- >>> crc_slow(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex()
- '5c90'
- >>> crc_slow(bytes([0x08, 0x01, 0x01, 0x17])).hex()
- '121a'
- >>> crc_slow(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex()
- '843f'
- >>> crc_slow(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17,
- ... 0x70])).hex()
- 'a861'
- >>> crc_slow(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex()
- '8d41'
- >>> crc_slow(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex()
- '1c5f'
- >>> crc_slow(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17,
- ... 0x00])).hex()
- 'db3e'
- >>> crc_slow(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex()
- '344c'
- >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00,
- ... 0x04, 0x93, 0xE0])).hex()
- 'f42b'
- >>> crc_slow(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex()
- '19ec'
- >>> crc_slow(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8,
- ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
- '964d'
- >>> crc_slow(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
- 'f471'
- """
- crc = 0xFFFF
- for byte in message:
- crc ^= byte
- for bit in range(8):
- lsb = crc & 1
- crc >>= 1
- if lsb:
- crc ^= 0xA001
- return bytes([crc & 0xFF, crc >> 8])
+from typing import Dict
-_crc_table: List[int] = []
+def crc(message: bytes) -> bytes:
+ """Calculate CRC for message.
+ Calculate CRC for message using precomputed dictionary.
-def crc(message: bytes) -> bytes:
- """Compute CRC for message.
-
- A 16 bit CRC as specified in the Modbus specification is computed for
- the given message and this CRC is returned as two bytes (low byte
- first).
-
- (This is the "fast" version with precomputed table.)
-
- >>> crc(bytes([0x08, 0x01, 0x00, 0x06, 0x00, 0x06])).hex()
- '5c90'
- >>> crc(bytes([0x08, 0x01, 0x01, 0x17])).hex()
- '121a'
- >>> crc(bytes([0x05, 0x03, 0x03, 0xE8, 0x00, 0x03])).hex()
- '843f'
- >>> crc(bytes([0x05, 0x03, 0x06, 0x00, 0x07, 0x00, 0x00, 0x17,
- ... 0x70])).hex()
- 'a861'
- >>> crc(bytes([0x0A, 0x05, 0x00, 0x00, 0xFF, 0x00])).hex()
- '8d41'
- >>> crc(bytes([0x01, 0x06, 0x2F, 0x4D, 0x13, 0x88])).hex()
- '1c5f'
- >>> crc(bytes([0x05, 0x0F, 0x00, 0x06, 0x00, 0x06, 0x02, 0x17,
- ... 0x00])).hex()
- 'db3e'
- >>> crc(bytes([0x05, 0x0f, 0x00, 0x06, 0x00, 0x06])).hex()
- '344c'
- >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02, 0x04, 0x00,
- ... 0x04, 0x93, 0xE0])).hex()
- 'f42b'
- >>> crc(bytes([0x01, 0x10, 0x2B, 0x01, 0x00, 0x02])).hex()
- '19ec'
- >>> crc(bytes([0x01, 0x17, 0x27, 0x10, 0x00, 0x02, 0x2A, 0xF8,
- ... 0x00, 0x02, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
- '964d'
- >>> crc(bytes([0x01, 0x17, 0x04, 0x00, 0x00, 0x13, 0x88])).hex()
- 'f471'
+ We define 12 test messages with their expected results from the manual
+ of the Hitachi PJ series Modbus devices:
+ >>> tests = [(bytes.fromhex('08 01 0006 0006'), bytes.fromhex('5C90')),
+ ... (bytes.fromhex('08 01 01 17'), bytes.fromhex('121A')),
+ ... (bytes.fromhex('05 03 03E8 0003'), bytes.fromhex('843F')),
+ ... (bytes.fromhex('05 03 06 0007 0000 1770'),
+ ... bytes.fromhex('A861')),
+ ... (bytes.fromhex('0A 05 0000 FF00'), bytes.fromhex('8D41')),
+ ... (bytes.fromhex('01 06 2F4D 1388'), bytes.fromhex('1C5F')),
+ ... (bytes.fromhex('05 0F 0006 0006 02 1700'),
+ ... bytes.fromhex('DB3E')),
+ ... (bytes.fromhex('05 0F 0006 0006'), bytes.fromhex('344C')),
+ ... (bytes.fromhex('01 10 2B01 0002 04 0004 93E0'),
+ ... bytes.fromhex('F42B')), # error in manual
+ ... (bytes.fromhex('01 10 2B01 0002'),
+ ... bytes.fromhex('19EC')), # error in manual
+ ... (bytes.fromhex('01 17 2710 0002 2AF8 0002 04 0000 1388'),
+ ... bytes.fromhex('964D')), # error in manual
+ ... (bytes.fromhex('01 17 04 0000 1388'),
+ ... bytes.fromhex('F471'))]
+
+ Now, we test the crc function against this list:
+ >>> crc_zero = bytes.fromhex('0000')
+ >>> for (message, expected) in tests:
+ ... result = crc(message)
+ ... assert result == expected, (f"crc({message.hex()}) ="
+ ... f" {result.hex()} !="
+ ... f" {expected.hex()}")
+ ... crc_message = message + expected
+ ... crc_result = crc(crc_message)
+ ... assert crc_result == crc_zero, (f"crc({crc_message.hex()}) = "
+ ... f" {crc_result.hex()} !="
+ ... f" {zero.hex()}")
+
+ Since the CRC of a whole message with its CRC appended is always zero,
+ we can check the CRC of incoming messages on the fly and just check for
+ it being zero at the end of transmission.
+ This is done in the plugin classes below and the crc function itself is
+ only used for constructing messages to be sent.
"""
- global _crc_table
- if not _crc_table:
- for crc in range(256):
- for bit in range(8):
- lsb = crc & 1
- crc >>= 1
- if lsb:
- crc ^= 0xA001
- _crc_table.append(crc)
crc = 0xFFFF
for byte in message:
crc ^= byte
- table_value = _crc_table[crc & 0xFF]
+ dict_key = crc & 0xFF
crc >>= 8
- crc ^= table_value
- return bytes([crc & 0xFF, crc >> 8])
+ crc ^= _crc_dict[dict_key]
+ return bytes((crc & 0xFF, crc >> 8))
+
+
+_crc_dict: Dict[int, int] = {}
+
+for dict_key in range(256):
+ dict_value = dict_key
+ for bit in range(8):
+ lsb = dict_value & 1
+ dict_value >>= 1
+ if lsb:
+ dict_value ^= 0xA001
+ _crc_dict[dict_key] = dict_value
-class Modbus(BasePlugin):
- """… plugin.
+class ModbusMaster(BasePlugin):
+ """Modbus-RTU master plugin.
- Do this and that.
+ >>> import controlpi
+ >>> asyncio.run(controlpi.test(
+ ... {"Test Master": {"plugin": "ModbusMaster",
+ ... "device": "/dev/pts/4",
+ ... "parity": "none"},
+ ... "Test Slave": {"plugin": "ModbusSlave",
+ ... "device": "/dev/pts/5",
+ ... "slave": 1,
+ ... "parity": "none"}},
+ ... [{"target": "Test Master", "command": "read coils",
+ ... "slave": 1, "start": 1, "quantity": 2},
+ ... {"target": "Test Master", "command": "write single coil",
+ ... "slave": 1, "address": 1, "value": False},
+ ... {"target": "Test Master", "command": "write single coil",
+ ... "slave": 1, "address": 2, "value": True},
+ ... {"target": "Test Master", "command": "read coils",
+ ... "slave": 1, "start": 1, "quantity": 2},
+ ... {"target": "Test Master", "command": "read holding registers",
+ ... "slave": 1, "start": 7, "quantity": 2},
+ ... {"target": "Test Master", "command": "write single register",
+ ... "slave": 1, "address": 7, "value": 42},
+ ... {"target": "Test Master", "command": "write single register",
+ ... "slave": 1, "address": 8, "value": 42042},
+ ... {"target": "Test Master", "command": "read holding registers",
+ ... "slave": 1, "start": 7, "quantity": 2}], 0.5))
+ ... # doctest: +NORMALIZE_WHITESPACE
"""
CONF_SCHEMA = {'properties':
{'device': {'type': 'string'},
- 'test device': {'type': 'string'},
- 'baudrate': {'type': 'integer'},
- 'slave types':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'object',
- 'properties':
- {'slaves':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'integer',
- 'minimum': 1,
- 'maximum': 247}}},
- 'coils':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'object',
- 'properties':
- {'address':
- {'type': 'integer',
- 'minimum': 0,
- 'maximum': 65535},
- 'name':
- {'type': 'string'}},
- 'required': ['address']}}},
- 'holding registers':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'object',
- 'properties':
- {'address':
- {'type': 'integer',
- 'minimum': 0,
- 'maximum': 65535},
- 'name':
- {'type': 'string'},
- 'count':
- {'type': 'integer',
- 'minimum': 1}},
- 'required': ['address']}}}}}}}},
+ 'baudrate': {'type': 'integer',
+ 'enum': [2400, 4800, 9600, 19200, 38400,
+ 57600, 115200],
+ 'default': 9600},
+ 'parity': {'type': 'string',
+ 'enum': ['even', 'odd', 'none'],
+ 'default': 'even'},
+ 'stopbits': {'type': 'integer',
+ 'minimum': 1,
+ 'maximum': 2,
+ 'default': 1},
+ 'retries': {'type': 'integer',
+ 'default': 2},
+ 'response timeout': {'type': 'number',
+ 'default': 0.2},
+ 'turnaround delay': {'type': 'number',
+ 'default': 0.1}},
'required': ['device']}
+ def process_conf(self) -> None:
+ """Process configuration and register plugin as bus client."""
+ # Default values:
+ if 'baudrate' not in self.conf:
+ self.conf['baudrate'] = 9600
+ if 'parity' not in self.conf:
+ self.conf['parity'] = 'even'
+ if 'stopbits' not in self.conf:
+ self.conf['stopbits'] = 1
+ if 'retries' not in self.conf:
+ self.conf['retries'] = 2
+ if 'response timeout' not in self.conf:
+ self.conf['response timeout'] = 0.2
+ if 'turnaround delay' not in self.conf:
+ self.conf['turnaround delay'] = 0.1
+ # Constants from serial for parity and stop bits:
+ self._parity = serial.PARITY_EVEN
+ if self.conf['parity'] == 'odd':
+ self._parity = serial.PARITY_ODD
+ elif self.conf['parity'] == 'none':
+ self._parity = serial.PARITY_NONE
+ self._stopbits = serial.STOPBITS_ONE
+ if self.conf['stopbits'] == 2:
+ self._stopbits = serial.STOPBITS_TWO
+ # 1.5 char and 3.5 char times for protocol timing
+ # (according to Modbus specification):
+ self._t15 = 0.000750
+ self._t35 = 0.001750
+ if self.conf['baudrate'] <= 19200:
+ bit_per_char = 9
+ if self.conf['parity'] != 'none':
+ bit_per_char += 1
+ bit_per_char += self.conf['stopbits']
+ seconds_per_char = bit_per_char / self.conf['baudrate']
+ self._t15 = 1.5 * seconds_per_char
+ self._t35 = 3.5 * seconds_per_char
+ # Queue for Modbus messages to be sent:
+ self._queue: asyncio.Queue = asyncio.Queue()
+ # Message templates sent and received by plugin client:
+ sends = []
+ receives = []
+ self._function_codes = {}
+ self._function_names = {}
+ # Error messages:
+ sends.append(MessageTemplate(
+ {'event': {'const': 'error'}}))
+ # 01 - Read Coils:
+ self._function_codes['read coils'] = 0x01
+ self._function_names[0x01] = 'read coils'
+ receives.append(MessageTemplate(
+ {'target': {'const': self.name},
+ 'command': {'const': self._function_names[0x01]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 2000}}))
+ sends.append(MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': self._function_names[0x01]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'values': {'type': 'array', 'items': {'type': 'boolean'}}}))
+ # 03 - Read Holding Registers:
+ self._function_codes['read holding registers'] = 0x03
+ self._function_names[0x03] = 'read holding registers'
+ receives.append(MessageTemplate(
+ {'target': {'const': self.name},
+ 'command': {'const': self._function_names[0x03]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 125}}))
+ sends.append(MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': self._function_names[0x03]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'values': {'type': 'array', 'items': {'type': 'integer'}}}))
+ # 05 - Write Single Coil:
+ self._function_codes['write single coil'] = 0x05
+ self._function_names[0x05] = 'write single coil'
+ receives.append(MessageTemplate(
+ {'target': {'const': self.name},
+ 'command': {'const': self._function_names[0x05]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'boolean'}}))
+ sends.append(MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': self._function_names[0x05]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'boolean'}}))
+ # 06 - Write Single Register:
+ self._function_codes['write single register'] = 0x06
+ self._function_names[0x06] = 'write single register'
+ receives.append(MessageTemplate(
+ {'target': {'const': self.name},
+ 'command': {'const': self._function_names[0x06]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}}))
+ sends.append(MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': self._function_names[0x06]},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}}))
+ # 08 - Diagnostic: TODO
+ # 0F - Write Multiple Coils: TODO
+ # 10 - Write Multiple Registers: TODO
+ # 17 - Read/Write Multiple Registers: TODO
+ self.bus.register(self.name, 'ModbusMaster',
+ sends, receives, self._receive)
+
async def _receive(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+ assert isinstance(message['command'], str)
+ function_code = self._function_codes[message['command']]
+ assert isinstance(message['slave'], int)
+ modbus_message = bytes([message['slave'], function_code])
+ if function_code == 0x01 or function_code == 0x03:
+ assert isinstance(message['start'], int)
+ start_hi = (message['start'] - 1) >> 8
+ start_lo = (message['start'] - 1) & 0xFF
+ assert isinstance(message['quantity'], int)
+ quantity_hi = message['quantity'] >> 8
+ quantity_lo = message['quantity'] & 0xFF
+ modbus_message += bytes([start_hi, start_lo,
+ quantity_hi, quantity_lo])
+ if function_code == 0x05 or function_code == 0x06:
+ assert isinstance(message['address'], int)
+ address_hi = (message['address'] - 1) >> 8
+ address_lo = (message['address'] - 1) & 0xFF
+ modbus_message += bytes([address_hi, address_lo])
+ if function_code == 0x05:
+ assert isinstance(message['value'], bool)
+ if message['value']:
+ modbus_message += b'\xFF\x00'
+ else:
+ modbus_message += b'\x00\x00'
+ if function_code == 0x06:
+ assert isinstance(message['value'], int)
+ value_hi = message['value'] >> 8
+ value_lo = message['value'] & 0xFF
+ modbus_message += bytes([value_hi, value_lo])
+ modbus_message += crc(modbus_message)
+ await self._queue.put(modbus_message)
+
+ async def _process_response(self, modbus_message: bytes) -> None:
+ message = Message(self.name)
+ message['event'] = 'response'
+ if len(modbus_message) < 4:
+ message['event'] = 'error'
+ message['description'] = 'Modbus message too short'
+ message['message'] = modbus_message.hex()
+ await self.bus.send(message)
+ return
+ if modbus_message[1] & 0x80:
+ message['event'] = 'error'
+ function_code = modbus_message[1] ^ 0x80
+ message['function'] = self._function_names[function_code]
+ message['slave'] = modbus_message[0]
+ if len(modbus_message) != 5:
+ message['description'] = 'Modbus exception has wrong length'
+ message['message'] = modbus_message.hex()
+ else:
+ error_code = modbus_message[2]
+ if error_code == 0x01:
+ message['description'] = 'Illegal function'
+ elif error_code == 0x02:
+ message['description'] = 'Illegal data address'
+ elif error_code == 0x03:
+ message['description'] = 'Illegal data value'
+ elif error_code == 0x04:
+ message['description'] = 'Server device failure'
+ else:
+ message['description'] = f"Error code '{error_code:02X}'"
+ await self.bus.send(message)
+ return
+ function_code = modbus_message[1]
+ message['function'] = self._function_names[function_code]
+ message['slave'] = modbus_message[0]
+ if function_code == 0x01 or function_code == 0x03:
+ length = modbus_message[2]
+ if len(modbus_message) != length + 5:
+ message['event'] = 'error'
+ message['description'] = 'Modbus response has wrong length'
+ message['message'] = modbus_message.hex()
+ await self.bus.send(message)
+ return
+ if function_code == 0x01:
+ bool_values = []
+ for i in range(length):
+ byte = modbus_message[3 + i]
+ for bit in range(8):
+ if byte & (1 << bit):
+ bool_values.append(True)
+ else:
+ bool_values.append(False)
+ message['values'] = bool_values
+ if function_code == 0x03:
+ int_values = []
+ for i in range(0, length, 2):
+ byte_hi = modbus_message[3 + i]
+ byte_lo = modbus_message[4 + i]
+ int_values.append(byte_hi * 256 + byte_lo)
+ message['values'] = int_values
+ if function_code == 0x05 or function_code == 0x06:
+ if len(modbus_message) != 8:
+ message['event'] = 'error'
+ message['description'] = 'Modbus response has wrong length'
+ message['message'] = modbus_message.hex()
+ await self.bus.send(message)
+ return
+ address_hi = modbus_message[3]
+ address_lo = modbus_message[4]
+ message['address'] = address_hi * 256 + address_lo + 1
+ if function_code == 0x05:
+ if modbus_message[5] == 0xFF and modbus_message[6] == 0x00:
+ message['value'] = True
+ elif modbus_message[5] == 0x00 and modbus_message[6] == 0x00:
+ message['value'] = False
+ else:
+ message['event'] = 'error'
+ message['description'] = 'Coil value not decodable'
+ message['message'] = modbus_message.hex()
+ await self.bus.send(message)
+ return
+ if function_code == 0x06:
+ value_hi = modbus_message[5]
+ value_lo = modbus_message[6]
+ message['value'] = value_hi * 256 + value_lo
+ await self.bus.send(message)
+
+ async def run(self) -> None:
+ """Open serial device and start loop on modbus message queue."""
+ reader, writer = await serial_asyncio.open_serial_connection(
+ url=self.conf['device'], baudrate=self.conf['baudrate'],
+ parity=self._parity, stopbits=self._stopbits)
+ while True:
+ modbus_request = await self._queue.get()
+ slave = modbus_request[0]
+ tries = 0
+ while tries < self.conf['retries']:
+ tries += 1
+ writer.write(modbus_request)
+ if slave == 0:
+ # Broadcast => just wait for delay and finish:
+ await asyncio.sleep(self.conf['turnaround delay'])
+ break
+ modbus_response = b''
+ crc = 0xFFFF
+ t35_task: asyncio.Task = asyncio.create_task(
+ asyncio.sleep(self._t35))
+ try:
+ # First byte is awaited for response timeout:
+ first_byte = await asyncio.wait_for(
+ reader.read(1), self.conf['response timeout'])
+ modbus_response += first_byte
+ crc ^= first_byte[0]
+ dict_key = crc & 0xFF
+ crc >>= 8
+ crc ^= _crc_dict[dict_key]
+ except asyncio.TimeoutError:
+ message = Message(self.name)
+ message['event'] = 'error'
+ message['description'] = (
+ f"Response time out on try {tries}.")
+ await self.bus.send(message)
+ continue
+ while True:
+ t35_task = asyncio.create_task(asyncio.sleep(self._t35))
+ try:
+ # Subsequent bytes are only awaited for the time
+ # needed to transmit 1.5 characters:
+ next_byte = await asyncio.wait_for(
+ reader.read(1), self._t15)
+ modbus_response += next_byte
+ crc ^= next_byte[0]
+ dict_key = crc & 0xFF
+ crc >>= 8
+ crc ^= _crc_dict[dict_key]
+ except asyncio.TimeoutError:
+ break
+ if crc != 0:
+ message = Message(self.name)
+ message['event'] = 'error'
+ message['description'] = (
+ f"Response CRC failed on try {tries}.")
+ message['message'] = modbus_response.hex()
+ await self.bus.send(message)
+ continue
+ if modbus_response[0] != slave:
+ message = Message(self.name)
+ message['event'] = 'error'
+ message['description'] = (
+ f"Response from wrong slave on try {tries}.")
+ message['message'] = modbus_response.hex()
+ await self.bus.send(message)
+ continue
+ await self._process_response(modbus_response)
+ await t35_task
+ break
+ self._queue.task_done()
+
+
+class ModbusSlave(BasePlugin):
+ """Modbus slave plugin.
+
+ This plugin implements a Modbus-RTU slave listening on a given serial
+ device.
+
+ We can use this to test the communication with a Modbus master plugin
+ instance.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'device': {'type': 'string'},
+ 'slave': {'type': 'integer',
+ 'minimum': 1,
+ 'maximum': 247},
+ 'baudrate': {'type': 'integer',
+ 'enum': [2400, 4800, 9600, 19200, 38400,
+ 57600, 115200],
+ 'default': 9600},
+ 'parity': {'type': 'string',
+ 'enum': ['even', 'odd', 'none'],
+ 'default': 'even'},
+ 'stopbits': {'type': 'integer',
+ 'minimum': 1,
+ 'maximum': 2,
+ 'default': 1}},
+ 'required': ['device', 'slave']}
def process_conf(self) -> None:
- """Register plugin as bus client."""
- message = Message(self.name, {'spam': self.conf['spam']})
- sends = [MessageTemplate.from_message(message)]
- receives = [MessageTemplate({'target': {'const': self.name}})]
- self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
+ """Process configuration and register plugin as bus client."""
+ # Default values:
+ if 'baudrate' not in self.conf:
+ self.conf['baudrate'] = 9600
+ if 'parity' not in self.conf:
+ self.conf['parity'] = 'even'
+ if 'stopbits' not in self.conf:
+ self.conf['stopbits'] = 1
+ # Constants from serial for parity and stop bits:
+ self._parity = serial.PARITY_EVEN
+ if self.conf['parity'] == 'odd':
+ self._parity = serial.PARITY_ODD
+ elif self.conf['parity'] == 'none':
+ self._parity = serial.PARITY_NONE
+ self._stopbits = serial.STOPBITS_ONE
+ if self.conf['stopbits'] == 2:
+ self._stopbits = serial.STOPBITS_TWO
+ # 1.5 char and 3.5 char times for protocol timing
+ # (according to Modbus specification):
+ self._t15 = 0.000750
+ self._t35 = 0.001750
+ if self.conf['baudrate'] <= 19200:
+ bit_per_char = 9
+ if self.conf['parity'] != 'none':
+ bit_per_char += 1
+ bit_per_char += self.conf['stopbits']
+ seconds_per_char = bit_per_char / self.conf['baudrate']
+ self._t15 = 1.5 * seconds_per_char
+ self._t35 = 3.5 * seconds_per_char
+ # Coils and registers:
+ self._coils: Dict[int, bool] = {}
+ self._registers: Dict[int, int] = {}
+ # Message templates sent by plugin client:
+ sends = [MessageTemplate({'event': {'const': 'received'}}),
+ MessageTemplate({'event': {'const': 'sent'}})]
+ self.bus.register(self.name, 'ModbusSlave', sends, [], self._receive)
+
+ async def _receive(self, message: Message) -> None:
+ pass
+
+ async def _process_request(self, modbus_request: bytes) -> bytes:
+ message = Message(self.name)
+ message['event'] = 'received'
+ message['message'] = modbus_request.hex()
+ await self.bus.send(message)
+ if len(modbus_request) < 4:
+ return b''
+ function_code = modbus_request[1]
+ modbus_response = bytes([self.conf['slave']])
+ if function_code == 0x01 or function_code == 0x03:
+ if len(modbus_request) != 8:
+ modbus_response += bytes([function_code | 0x80, 0x03])
+ else:
+ start_hi = modbus_request[2]
+ start_lo = modbus_request[3]
+ start = start_hi * 256 + start_lo
+ quantity_hi = modbus_request[4]
+ quantity_lo = modbus_request[5]
+ quantity = quantity_hi * 256 + quantity_lo
+ if function_code == 0x01:
+ okay = True
+ for i in range(quantity):
+ if start + i not in self._coils:
+ modbus_response += bytes([0x81, 0x02])
+ okay = False
+ break
+ else:
+ if self._coils[start + i]:
+ pass
+ if function_code == 0x03:
+ pass
+ elif function_code == 0x05 or function_code == 0x06:
+ pass
+ else:
+ modbus_response += bytes([function_code | 0x80, 0x01])
+ modbus_response += crc(modbus_response)
+ message = Message(self.name)
+ message['event'] = 'sent'
+ message['message'] = modbus_response.hex()
+ await self.bus.send(message)
+ return modbus_response
async def run(self) -> None:
- """Send initial message."""
- await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+ """Open serial device and start loop on its reader."""
+ reader, writer = await serial_asyncio.open_serial_connection(
+ url=self.conf['device'], baudrate=self.conf['baudrate'],
+ parity=self._parity, stopbits=self._stopbits)
+ while True:
+ modbus_request = b''
+ crc = 0xFFFF
+ t35_task: asyncio.Task = asyncio.create_task(
+ asyncio.sleep(self._t35))
+ # First byte is read without timeout:
+ first_byte = await reader.read(1)
+ modbus_request += first_byte
+ crc ^= first_byte[0]
+ dict_key = crc & 0xFF
+ crc >>= 8
+ crc ^= _crc_dict[dict_key]
+ while True:
+ t35_task = asyncio.create_task(asyncio.sleep(self._t35))
+ try:
+ # Subsequent bytes are only awaited for the time
+ # needed to transmit 1.5 characters:
+ next_byte = await asyncio.wait_for(
+ reader.read(1), self._t15)
+ modbus_request += next_byte
+ crc ^= next_byte[0]
+ dict_key = crc & 0xFF
+ crc >>= 8
+ crc ^= _crc_dict[dict_key]
+ except asyncio.TimeoutError:
+ break
+ if crc != 0:
+ continue
+ if modbus_request[0] != self.conf['slave']:
+ continue
+ modbus_response = await self._process_request(modbus_request)
+ await t35_task
+ if modbus_response:
+ writer.write(modbus_response)