…
"""
+from abc import ABC, abstractmethod
import asyncio
import serial # type: ignore
import serial_asyncio # type: ignore
from controlpi import BasePlugin, Message, MessageTemplate
-from typing import Dict
+from typing import List, Dict, Union, Type
class CRC:
CRC.precompute()
+class ModbusFunction(ABC):
+ """Abstract base class for Modbus functions."""
+
+ FUNCTION_CODE = 0x00
+
+ FUNCTION_NAME = ''
+
+ @staticmethod
+ @abstractmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ These messages are received on the message bus, processed and sent
+ to slaves on the Modbus.
+ """
+ raise NotImplementedError
+
+ @staticmethod
+ @abstractmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ These messages are constructed from replies received from slaves on
+ the Modbus and then sent to the message bus.
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ self._name = ''
+ self._slave = 0
+ raise NotImplementedError
+
+ @abstractmethod
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ raise NotImplementedError
+
+ def broadcast(self) -> bool:
+ """Determine if this is a broadcast."""
+ return self._slave == 0
+
+ def process_error(self, response: bytes) -> Message:
+ """Process error PDU to message."""
+ message = Message(self._name)
+ message['event'] = 'error'
+ if len(response) != 5:
+ message['message'] = response.hex()
+ message['description'] = 'Modbus exception has wrong length'
+ else:
+ message['slave'] = response[0]
+ function_code = response[1] ^ 0x80
+ if function_code != self.FUNCTION_CODE:
+ message['message'] = response.hex()
+ message['description'] = (
+ "Modbus exception for wrong function code"
+ f" '{function_code}'.")
+ else:
+ message['function'] = self.FUNCTION_NAME
+ error_code = response[2]
+ if error_code == 0x01:
+ message['description'] = 'Illegal function'
+ elif error_code == 0x02:
+ message['description'] = 'Illegal data address'
+ elif error_code == 0x03:
+ message['description'] = 'Illegal data value'
+ elif error_code == 0x04:
+ message['description'] = 'Server device failure'
+ else:
+ message['description'] = f"Error code '{error_code:02X}'"
+ return message
+
+ @abstractmethod
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ raise NotImplementedError
+
+
+class ReadCoils(ModbusFunction):
+ """ModbusFunction for 0x01 Read Coils."""
+
+ FUNCTION_CODE = 0x01
+
+ FUNCTION_NAME = 'read coils'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the start address and the quantity of coils to read
+ (up to 2000).
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'read coils'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 2000}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are a mapping of read coil addresses to Boolean values.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'read coils'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'values': {'type': 'array', 'items': {'type': 'boolean'}}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['start'], int)
+ self._start: int = message['start']
+ assert isinstance(message['quantity'], int)
+ self._quantity: int = message['quantity']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x01])
+ start_hi = (self._start - 1) >> 8
+ start_lo = (self._start - 1) & 0xFF
+ request_pdu += bytes([start_hi, start_lo])
+ quantity_hi = self._quantity >> 8
+ quantity_lo = self._quantity & 0xFF
+ request_pdu += bytes([quantity_hi, quantity_lo])
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class ReadHoldingRegisters(ModbusFunction):
+ """ModbusFunction for 0x03 Read Holding Registers."""
+
+ FUNCTION_CODE = 0x03
+
+ FUNCTION_NAME = 'read holding registers'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the start address and the quantity of registers to
+ read (up to 125).
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'read holding registers'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 125}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are a mapping of read register addresses to integers.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'read holding registers'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'values': {'type': 'array', 'items': {'type': 'integer'}}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['start'], int)
+ self._start: int = message['start']
+ assert isinstance(message['quantity'], int)
+ self._quantity: int = message['quantity']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x03])
+ start_hi = (self._start - 1) >> 8
+ start_lo = (self._start - 1) & 0xFF
+ request_pdu += bytes([start_hi, start_lo])
+ quantity_hi = self._quantity >> 8
+ quantity_lo = self._quantity & 0xFF
+ request_pdu += bytes([quantity_hi, quantity_lo])
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class WriteSingleCoil(ModbusFunction):
+ """ModbusFunction for 0x05 Write Single Coil."""
+
+ FUNCTION_CODE = 0x05
+
+ FUNCTION_NAME = 'write single coil'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the address and Boolean value to be written.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'write single coil'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'boolean'}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are the address and the written Boolean value.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'write single coil'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'boolean'}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['address'], int)
+ self._address: int = message['address']
+ assert isinstance(message['value'], bool)
+ self._value: bool = message['value']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x05])
+ address_hi = (self._address - 1) >> 8
+ address_lo = (self._address - 1) & 0xFF
+ request_pdu += bytes([address_hi, address_lo])
+ if self._value:
+ request_pdu += b'\xFF\x00'
+ else:
+ request_pdu += b'\x00\x00'
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class WriteSingleRegister(ModbusFunction):
+ """ModbusFunction for 0x06 Write Single Register."""
+
+ FUNCTION_CODE = 0x06
+
+ FUNCTION_NAME = 'write single register'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the address and the integer to be written.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'write single register'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are the address and the written integer.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'write single register'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['address'], int)
+ self._address: int = message['address']
+ assert isinstance(message['value'], int)
+ self._value: int = message['value']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x06])
+ address_hi = (self._address - 1) >> 8
+ address_lo = (self._address - 1) & 0xFF
+ request_pdu += bytes([address_hi, address_lo])
+ value_hi = self._value >> 8
+ value_lo = self._value & 0xFF
+ request_pdu += bytes([value_hi, value_lo])
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class Diagnostic(ModbusFunction):
+ """ModbusFunction for 0x08 Diagnostic."""
+
+ FUNCTION_CODE = 0x08
+
+ FUNCTION_NAME = 'diagnostic'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are 32 bit of data to be echoed by the slave.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'diagnostic'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'data': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are the echoed data.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'diagnostic'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'data': {'type': 'integer', 'minimum': 0, 'maximum': 65535}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['data'], int)
+ self._data: int = message['data']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x08])
+ request_pdu += b'\x00\x00'
+ data_hi = self._data >> 8
+ data_lo = self._data & 0xFF
+ request_pdu += bytes([data_hi, data_lo])
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class WriteMultipleCoils(ModbusFunction):
+ """ModbusFunction for 0x0F Write Multiple Coils."""
+
+ FUNCTION_CODE = 0x0F
+
+ FUNCTION_NAME = 'write multiple coils'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the start address and an array of up to 1968 Boolean
+ values to write.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'write multiple coils'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'values': {'type': 'array', 'items': {'type': 'boolean'}}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are a mapping of written coil addresses to Boolean values.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'write multiple coils'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'written': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$': {'type': 'boolean'}},
+ 'additionalProperties': False}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['start'], int)
+ self._start: int = message['start']
+ assert isinstance(message['values'], list)
+ self._values: List[bool] = message['values']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x0F])
+ start_hi = (self._start - 1) >> 8
+ start_lo = (self._start - 1) & 0xFF
+ request_pdu += bytes([start_hi, start_lo])
+ quantity = len(self._values)
+ quantity_hi = quantity >> 8
+ quantity_lo = quantity & 0xFF
+ request_pdu += bytes([quantity_hi, quantity_lo])
+ data_bytes = []
+ byte = 0
+ bit = 0
+ for value in self._values:
+ if value:
+ byte += 2**bit
+ bit += 1
+ if bit == 8:
+ data_bytes.append(byte)
+ byte = 0
+ bit = 0
+ if bit > 0:
+ data_bytes.append(byte)
+ if len(data_bytes) % 2 != 0:
+ data_bytes.append(0)
+ request_pdu += bytes([len(data_bytes)])
+ request_pdu += bytes(data_bytes)
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class WriteMultipleRegisters(ModbusFunction):
+ """ModbusFunction for 0x10 Write Multiple Registers."""
+
+ FUNCTION_CODE = 0x10
+
+ FUNCTION_NAME = 'write multiple registers'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the start address and an array of up to 123 integers
+ to write.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'write multiple registers'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
+ 'values': {'type': 'array', 'items': {'type': 'integer'}}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are a mapping of written register addresses to integers.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'write multiple registers'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'written': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$':
+ {'type': 'integer', 'minimum': 0, 'maximum': 65535}},
+ 'additionalProperties': False}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['start'], int)
+ self._start: int = message['start']
+ assert isinstance(message['values'], list)
+ self._values: List[int] = message['values']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x10])
+ start_hi = (self._start - 1) >> 8
+ start_lo = (self._start - 1) & 0xFF
+ request_pdu += bytes([start_hi, start_lo])
+ quantity = len(self._values)
+ quantity_hi = quantity >> 8
+ quantity_lo = quantity & 0xFF
+ request_pdu += bytes([quantity_hi, quantity_lo])
+ data_bytes = []
+ for value in self._values:
+ value_hi = value >> 8
+ data_bytes.append(value_hi)
+ value_lo = value & 0xFF
+ data_bytes.append(value_lo)
+ request_pdu += bytes([len(data_bytes)])
+ request_pdu += bytes(data_bytes)
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
+class ReadWriteMultipleRegisters(ModbusFunction):
+ """ModbusFunction for 0x17 Read/Write Multiple Registers."""
+
+ FUNCTION_CODE = 0x17
+
+ FUNCTION_NAME = 'read/write multiple registers'
+
+ @staticmethod
+ def receive_template(name: str) -> MessageTemplate:
+ """Template for received messages.
+
+ Parameters are the read start address, the quantity of registers to
+ read (up to 125), the write start address and an array of up to 121
+ integers to write.
+ """
+ return MessageTemplate(
+ {'target': {'const': name},
+ 'command': {'const': 'write single register'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'read start': {'type': 'integer',
+ 'minimum': 1, 'maximum': 65536},
+ 'read quantity': {'type': 'integer',
+ 'minimum': 1, 'maximum': 125},
+ 'write start': {'type': 'integer',
+ 'minimum': 1, 'maximum': 65536},
+ 'write values': {'type': 'array', 'items': {'type': 'integer'}}})
+
+ @staticmethod
+ def send_template() -> MessageTemplate:
+ """Template for sent messages.
+
+ Parameters are a mapping of read register addresses to integers and
+ a mapping of written register addresses to integers.
+ """
+ return MessageTemplate(
+ {'event': {'const': 'response'},
+ 'function': {'const': 'write single register'},
+ 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
+ 'read': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$':
+ {'type': 'integer', 'minimum': 0, 'maximum': 65535}},
+ 'additionalProperties': False},
+ 'written': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$':
+ {'type': 'integer', 'minimum': 0, 'maximum': 65535}},
+ 'additionalProperties': False}})
+
+ def __init__(self, message: Message) -> None:
+ """Initialise function based on received message."""
+ assert isinstance(message['target'], str)
+ self._name: str = message['target']
+ assert isinstance(message['slave'], int)
+ self._slave: int = message['slave']
+ assert isinstance(message['read start'], int)
+ self._read_start: int = message['read start']
+ assert isinstance(message['read quantity'], int)
+ self._read_quantity: int = message['read quantity']
+ assert isinstance(message['write start'], int)
+ self._write_start: int = message['write start']
+ assert isinstance(message['write values'], list)
+ self._write_values: List[int] = message['write values']
+
+ def get_request(self) -> bytes:
+ """Construct request PDU for function."""
+ request_pdu = bytes([self._slave, 0x17])
+ read_start_hi = (self._read_start - 1) >> 8
+ read_start_lo = (self._read_start - 1) & 0xFF
+ request_pdu += bytes([read_start_hi, read_start_lo])
+ read_quantity_hi = self._read_quantity >> 8
+ read_quantity_lo = self._read_quantity & 0xFF
+ request_pdu += bytes([read_quantity_hi, read_quantity_lo])
+ write_start_hi = (self._write_start - 1) >> 8
+ write_start_lo = (self._write_start - 1) & 0xFF
+ request_pdu += bytes([write_start_hi, write_start_lo])
+ write_quantity = len(self._write_values)
+ write_quantity_hi = write_quantity >> 8
+ write_quantity_lo = write_quantity & 0xFF
+ request_pdu += bytes([write_quantity_hi, write_quantity_lo])
+ data_bytes = []
+ for value in self._write_values:
+ value_hi = value >> 8
+ data_bytes.append(value_hi)
+ value_lo = value & 0xFF
+ data_bytes.append(value_lo)
+ request_pdu += bytes([len(data_bytes)])
+ request_pdu += bytes(data_bytes)
+ return request_pdu
+
+ def process_response(self, response: bytes) -> Message:
+ """Process response PDU to message."""
+ if response[1] & 0x80:
+ return self.process_error(response)
+ if response[0] != self._slave:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response from wrong slave."})
+ if response[1] != self.FUNCTION_CODE:
+ return Message(self._name,
+ {'event': 'error',
+ 'message': response.hex(),
+ 'description': "Response for wrong function."})
+ message = Message(self._name,
+ {'event': 'response',
+ 'slave': self._slave,
+ 'function': self.FUNCTION_NAME})
+ raise NotImplementedError
+
+
class ModbusMaster(BasePlugin):
"""Modbus-RTU master plugin.
... {"target": "Test Master", "command": "read holding registers",
... "slave": 1, "start": 7, "quantity": 2}], 0.1)
>>> asyncio.run(test())
- ... # doctest: +NORMALIZE_WHITESPACE
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
test(): {'sender': '', 'event': 'registered',
- 'client': 'Test Master', 'plugin': 'ModbusMaster',
- 'sends': [{'event': {'const': 'error'}},
- {'event': {'const': 'response'},
- 'function': {'const': 'read coils'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array',
- 'items': {'type': 'boolean'}}},
- {'event': {'const': 'response'},
- 'function': {'const': 'read holding registers'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array',
- 'items': {'type': 'integer'}}},
- {'event': {'const': 'response'},
- 'function': {'const': 'write single coil'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'boolean'}},
- {'event': {'const': 'response'},
- 'function': {'const': 'write single register'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'integer',
- 'minimum': 0, 'maximum': 65535}}],
- 'receives': [{'target': {'const': 'Test Master'},
- 'command': {'const': 'read coils'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'start': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'quantity': {'type': 'integer',
- 'minimum': 1, 'maximum': 2000}},
- {'target': {'const': 'Test Master'},
- 'command': {'const': 'read holding registers'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'start': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'quantity': {'type': 'integer',
- 'minimum': 1, 'maximum': 125}},
- {'target': {'const': 'Test Master'},
- 'command': {'const': 'write single coil'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'boolean'}},
- {'target': {'const': 'Test Master'},
- 'command': {'const': 'write single register'},
- 'slave': {'type': 'integer',
- 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer',
- 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'integer',
- 'minimum': 0, 'maximum': 65535}}]}
+ 'client': 'Test Master', 'plugin': 'ModbusMaster', ...}
test(): {'sender': '', 'event': 'registered',
- 'client': 'Test Slave', 'plugin': 'ModbusSlave',
- 'sends': [{'event': {'const': 'received'}},
- {'event': {'const': 'crc error'}},
- {'event': {'const': 'sent'}}],
- 'receives': []}
+ 'client': 'Test Slave', 'plugin': 'ModbusSlave', ...}
test(): {'sender': 'test()', 'target': 'Test Master',
'command': 'read coils', 'slave': 1,
'start': 1, 'quantity': 2}
# Message templates sent and received by plugin client:
sends = []
receives = []
- self._function_codes = {}
- self._function_names = {}
+ self._modbus_functions = {}
# Error messages:
sends.append(MessageTemplate(
{'event': {'const': 'error'}}))
- # 01 - Read Coils:
- self._function_codes['read coils'] = 0x01
- self._function_names[0x01] = 'read coils'
- receives.append(MessageTemplate(
- {'target': {'const': self.name},
- 'command': {'const': self._function_names[0x01]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 2000}}))
- sends.append(MessageTemplate(
- {'event': {'const': 'response'},
- 'function': {'const': self._function_names[0x01]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array', 'items': {'type': 'boolean'}}}))
- # 03 - Read Holding Registers:
- self._function_codes['read holding registers'] = 0x03
- self._function_names[0x03] = 'read holding registers'
- receives.append(MessageTemplate(
- {'target': {'const': self.name},
- 'command': {'const': self._function_names[0x03]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'start': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'quantity': {'type': 'integer', 'minimum': 1, 'maximum': 125}}))
- sends.append(MessageTemplate(
- {'event': {'const': 'response'},
- 'function': {'const': self._function_names[0x03]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array', 'items': {'type': 'integer'}}}))
- # 05 - Write Single Coil:
- self._function_codes['write single coil'] = 0x05
- self._function_names[0x05] = 'write single coil'
- receives.append(MessageTemplate(
- {'target': {'const': self.name},
- 'command': {'const': self._function_names[0x05]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'boolean'}}))
- sends.append(MessageTemplate(
- {'event': {'const': 'response'},
- 'function': {'const': self._function_names[0x05]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'boolean'}}))
- # 06 - Write Single Register:
- self._function_codes['write single register'] = 0x06
- self._function_names[0x06] = 'write single register'
- receives.append(MessageTemplate(
- {'target': {'const': self.name},
- 'command': {'const': self._function_names[0x06]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}}))
- sends.append(MessageTemplate(
- {'event': {'const': 'response'},
- 'function': {'const': self._function_names[0x06]},
- 'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'address': {'type': 'integer', 'minimum': 1, 'maximum': 65536},
- 'value': {'type': 'integer', 'minimum': 0, 'maximum': 65535}}))
- # 08 - Diagnostic: TODO
- # 0F - Write Multiple Coils: TODO
- # 10 - Write Multiple Registers: TODO
- # 17 - Read/Write Multiple Registers: TODO
+ # Process all implemented Modbus functions:
+ for cls in ModbusFunction.__subclasses__():
+ self._modbus_functions[cls.FUNCTION_NAME] = cls
+ sends.append(cls.send_template())
+ receives.append(cls.receive_template(self.name))
self.bus.register(self.name, 'ModbusMaster',
sends, receives, self._receive)
async def _receive(self, message: Message) -> None:
- assert isinstance(message['command'], str)
- function_code = self._function_codes[message['command']]
- assert isinstance(message['slave'], int)
- modbus_message = bytes([message['slave'], function_code])
- if function_code == 0x01 or function_code == 0x03:
- assert isinstance(message['start'], int)
- start_hi = (message['start'] - 1) >> 8
- start_lo = (message['start'] - 1) & 0xFF
- assert isinstance(message['quantity'], int)
- quantity_hi = message['quantity'] >> 8
- quantity_lo = message['quantity'] & 0xFF
- modbus_message += bytes([start_hi, start_lo,
- quantity_hi, quantity_lo])
- if function_code == 0x05 or function_code == 0x06:
- assert isinstance(message['address'], int)
- address_hi = (message['address'] - 1) >> 8
- address_lo = (message['address'] - 1) & 0xFF
- modbus_message += bytes([address_hi, address_lo])
- if function_code == 0x05:
- assert isinstance(message['value'], bool)
- if message['value']:
- modbus_message += b'\xFF\x00'
- else:
- modbus_message += b'\x00\x00'
- if function_code == 0x06:
- assert isinstance(message['value'], int)
- value_hi = message['value'] >> 8
- value_lo = message['value'] & 0xFF
- modbus_message += bytes([value_hi, value_lo])
- modbus_message += bytes(CRC(modbus_message))
- await self._queue.put(modbus_message)
+ await self._queue.put(message)
async def _process_response(self, modbus_message: bytes) -> None:
- message = Message(self.name)
- message['event'] = 'response'
- if len(modbus_message) < 4:
- message['event'] = 'error'
- message['description'] = 'Modbus message too short'
- message['message'] = modbus_message.hex()
- await self.bus.send(message)
- return
- if modbus_message[1] & 0x80:
- message['event'] = 'error'
- function_code = modbus_message[1] ^ 0x80
- message['function'] = self._function_names[function_code]
- message['slave'] = modbus_message[0]
- if len(modbus_message) != 5:
- message['description'] = 'Modbus exception has wrong length'
- message['message'] = modbus_message.hex()
- else:
- error_code = modbus_message[2]
- if error_code == 0x01:
- message['description'] = 'Illegal function'
- elif error_code == 0x02:
- message['description'] = 'Illegal data address'
- elif error_code == 0x03:
- message['description'] = 'Illegal data value'
- elif error_code == 0x04:
- message['description'] = 'Server device failure'
- else:
- message['description'] = f"Error code '{error_code:02X}'"
- await self.bus.send(message)
- return
- function_code = modbus_message[1]
- message['function'] = self._function_names[function_code]
- message['slave'] = modbus_message[0]
if function_code == 0x01 or function_code == 0x03:
length = modbus_message[2]
if len(modbus_message) != length + 5:
for i in range(length):
byte = modbus_message[3 + i]
for bit in range(8):
- if byte & (1 << bit):
+ if byte & 2**bit:
bool_values.append(True)
else:
bool_values.append(False)
url=self.conf['device'], baudrate=self.conf['baudrate'],
parity=self._parity, stopbits=self._stopbits)
while True:
- request = await self._queue.get()
- slave = request[0]
- function_code = request[1]
+ # Initialise ModbusFunction instance from message:
+ message = await self._queue.get()
+ cls = self._modbus_functions[message['command']]
+ function = cls(message)
+ request_pdu = function.get_request()
+ request_pdu += bytes(CRC(request_pdu))
+ # TODO: In specific classes:
+ slave = request_pdu[0]
+ function_code = request_pdu[1]
# Read bytes from serial (normally there shouldn't be any):
- unexpected_message = b''
+ unexpected_pdu = b''
while True:
try:
- unexpected_message += await asyncio.wait_for(
+ unexpected_pdu += await asyncio.wait_for(
reader.read(1), 0.001)
except asyncio.TimeoutError:
break
- if unexpected_message:
- message = Message(self.name)
- message['event'] = 'error'
- message['message'] = unexpected_message.hex()
- message['description'] = "Unexpected message on Modbus."
- await self.bus.send(message)
+ if unexpected_pdu:
+ await self.bus.send(Message(self.name,
+ {'event': 'error',
+ 'message': unexpected_pdu.hex(),
+ 'description':
+ "Unexpected message on Modbus."}))
# Retry self.conf['retries'] times:
tries = 0
while tries <= self.conf['retries']:
tries += 1
- writer.write(request)
- if slave == 0:
- # Broadcast => just wait for delay and finish:
+ writer.write(request_pdu)
+ if function.broadcast():
await asyncio.sleep(0.1)
break
# Read response:
- response = b''
+ response_pdu = b''
crc = CRC()
expected_length = 5
length = 0
while True:
try:
- response += await asyncio.wait_for(
+ response_pdu += await asyncio.wait_for(
reader.read(1), 0.2)
- crc.update(response[length])
+ crc.update(response_pdu[length])
length += 1
except asyncio.TimeoutError:
break
# Update expected length based on function:
- if (length == 2 and
- (response[1] == 0x05 or response[1] == 0x06 or
- response[1] == 0x08 or response[1] == 0x0F or
- response[1] == 0x10)):
+ if (length == 2 and (response_pdu[1] == 0x05 or
+ response_pdu[1] == 0x06 or
+ response_pdu[1] == 0x08 or
+ response_pdu[1] == 0x0F or
+ response_pdu[1] == 0x10)):
expected_length = 8
- if (length == 3 and
- (response[1] == 0x01 or response[1] == 0x03 or
- response[1] == 0x17)):
- expected_length = response[2] + 5
+ if (length == 3 and (response_pdu[1] == 0x01 or
+ response_pdu[1] == 0x03 or
+ response_pdu[1] == 0x17)):
+ expected_length = response_pdu[2] + 5
# Break at expected length:
if length == expected_length:
break
# Process possible errors leading to retry:
message = Message(self.name)
message['event'] = 'error'
- message['message'] = response.hex()
+ message['message'] = response_pdu.hex()
if length < expected_length:
message['description'] = (
f"Response timed out on try {tries}.")
+ if not crc:
+ message['description'] += " And CRC failed."
await self.bus.send(message)
continue
if not crc:
f"Response CRC failed on try {tries}.")
await self.bus.send(message)
continue
- if response[0] != slave:
- message['description'] = (
- f"Response from wrong slave on try {tries}.")
- await self.bus.send(message)
- continue
- if response[1] & 0x7F != function_code:
- message['description'] = (
- f"Response for wrong function on try {tries}.")
- await self.bus.send(message)
- continue
- # Process response from correct slave for correct funtion
- # with valid CRC:
- await self._process_response(response)
+ # Process response:
+ message = function.process_response(response_pdu)
+ await self.bus.send(message)
break
self._queue.task_done()
elif function_code == 0x06:
value = modbus_request[4]*256 + modbus_request[5]
self._registers[address] = value
- return modbus_request[:-2]
+ return modbus_request[:6]
+ elif function_code == 0x08:
+ if len(modbus_request) != 8:
+ return bytes((slave, function_code | 0x80, 0x03))
+ if modbus_request[2] != 0 or modbus_request[3] != 0:
+ return bytes((slave, function_code | 0x80, 0x01))
+ return modbus_request[:6]
+ elif function_code == 0x0F or function_code == 0x10:
+ if len(modbus_request) < 9:
+ return bytes((slave, function_code | 0x80, 0x03))
+ start = modbus_request[2]*256 + modbus_request[3]
+ quantity = modbus_request[4]*256 + modbus_request[5]
+ data_length = modbus_request[6]
+ if len(modbus_request) != 9 + data_length:
+ return bytes((slave, function_code | 0x80, 0x03))
+ data = []
+ if function_code == 0x0F:
+ for i in range(data_length):
+ for bit in range(8):
+ if modbus_request[7 + i] & 2**bit:
+ data.append(True)
+ else:
+ data.append(False)
+ elif function_code == 0x10:
+ hi = True
+ for i in range(data_length):
+ if hi:
+ data.append(modbus_request[7 + i]*256)
+ hi = False
+ else:
+ data[-1] += modbus_request[7 + i]
+ hi = True
+ if len(data) < quantity:
+ return bytes((slave, function_code | 0x80, 0x03))
+ if function_code == 0x0F:
+ for i in range(quantity):
+ self._coils[start + i] = data[i]
+ elif function_code == 0x10:
+ for i in range(quantity):
+ self._registers[start + i] = data[i]
+ return modbus_request[:6]
+ elif function_code == 0x17:
+ if len(modbus_request) < 13:
+ return bytes((slave, function_code | 0x80, 0x03))
+ read_start = modbus_request[2]*256 + modbus_request[3]
+ read_quantity = modbus_request[4]*256 + modbus_request[5]
+ write_start = modbus_request[6]*256 + modbus_request[7]
+ write_quantity = modbus_request[8]*256 + modbus_request[9]
+ data_length = modbus_request[10]
+ if len(modbus_request) != 13 + data_length:
+ return bytes((slave, function_code | 0x80, 0x03))
+ write_data = []
+ hi = True
+ for i in range(data_length):
+ if hi:
+ write_data.append(modbus_request[11 + i]*256)
+ hi = False
+ else:
+ write_data[-1] += modbus_request[11 + i]
+ hi = True
+ if len(write_data) != write_quantity:
+ return bytes((slave, function_code | 0x80, 0x03))
+ for i in range(write_quantity):
+ self._registers[write_start + i] = write_data[i]
+ read_data = []
+ for i in range(read_quantity):
+ if read_start + i not in self._registers:
+ return bytes((slave, 0x83, 0x02))
+ read_data.append(self._registers[read_start + i] >> 8)
+ read_data.append(self._registers[read_start + i] & 0xFF)
+ return bytes([slave, function_code, len(read_data)] + read_data)
return bytes((slave, function_code | 0x80, 0x01))
async def run(self) -> None:
crc = CRC()
expected_length = 8
length = 0
+ request += await reader.read(1)
+ crc.update(request[length])
+ length += 1
while True:
try:
request += await asyncio.wait_for(