message['event'] = 'error'
if len(response) != 5:
message['message'] = response.hex()
- message['description'] = 'Modbus exception has wrong length'
+ message['description'] = 'Modbus exception has wrong length.'
else:
message['slave'] = response[0]
function_code = response[1] ^ 0x80
{'event': {'const': 'response'},
'function': {'const': 'read coils'},
'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array', 'items': {'type': 'boolean'}}})
+ 'read': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$': {'type': 'boolean'}},
+ 'additionalProperties': False}})
def __init__(self, message: Message) -> None:
"""Initialise function based on received message."""
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ length = response[2]
+ read = {}
+ for i in range(length):
+ byte = response[3 + i]
+ for bit in range(8):
+ address = self._start + 8*i + bit
+ if address < self._start + self._quantity:
+ if byte & 2**bit:
+ read[str(address)] = True
+ else:
+ read[str(address)] = False
+ message['read'] = read
+ return message
class ReadHoldingRegisters(ModbusFunction):
{'event': {'const': 'response'},
'function': {'const': 'read holding registers'},
'slave': {'type': 'integer', 'minimum': 1, 'maximum': 247},
- 'values': {'type': 'array', 'items': {'type': 'integer'}}})
+ 'read': {'type': 'object',
+ 'patternProperties':
+ {'^[1-9][0-9]*$':
+ {'type': 'integer', 'minimum': 0, 'maximum': 65535}},
+ 'additionalProperties': False}})
def __init__(self, message: Message) -> None:
"""Initialise function based on received message."""
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ length = response[2]
+ read = {}
+ for i in range(length):
+ byte = response[3 + i]
+ if i % 2 == 0:
+ address = self._start + i / 2
+ read[str(int(address))] = byte * 256
+ else:
+ address = self._start + (i - 1) / 2
+ read[str(int(address))] += byte
+ message['read'] = read
+ return message
class WriteSingleCoil(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ message['address'] = response[2] * 256 + response[3] + 1
+ if response[4] == 0xFF and response[5] == 0x00:
+ message['value'] = True
+ elif response[4] == 0x00 and response[5] == 0x00:
+ message['value'] = False
+ else:
+ message['event'] = 'error'
+ message['description'] = 'Coil value not decodable.'
+ message['message'] = response.hex()
+ return message
class WriteSingleRegister(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ message['address'] = response[2] * 256 + response[3] + 1
+ message['value'] = response[4] * 256 + response[5]
+ return message
class Diagnostic(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ message['data'] = response[2] * 256 + response[3]
+ return message
class WriteMultipleCoils(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ start = response[2] * 256 + response[3] + 1
+ if start != self._start:
+ message['event'] = 'error'
+ message['description'] = 'Start address does not match.'
+ message['message'] = response.hex()
+ return message
+ quantity = response[4] * 256 + response[5]
+ if quantity != len(self._values):
+ message['event'] = 'error'
+ message['description'] = 'Quantitiy does not match.'
+ message['message'] = response.hex()
+ return message
+ written = {}
+ for i in range(len(self._values)):
+ written[str(self._start + i)] = self._values[i]
+ message['written'] = written
+ return message
class WriteMultipleRegisters(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ start = response[2] * 256 + response[3] + 1
+ if start != self._start:
+ message['event'] = 'error'
+ message['description'] = 'Start address does not match.'
+ message['message'] = response.hex()
+ return message
+ quantity = response[4] * 256 + response[5]
+ if quantity != len(self._values):
+ message['event'] = 'error'
+ message['description'] = 'Quantitiy does not match.'
+ message['message'] = response.hex()
+ return message
+ written = {}
+ for i in range(len(self._values)):
+ written[str(self._start + i)] = self._values[i]
+ message['written'] = written
+ return message
class ReadWriteMultipleRegisters(ModbusFunction):
{'event': 'response',
'slave': self._slave,
'function': self.FUNCTION_NAME})
- raise NotImplementedError
+ length = response[2]
+ read = {}
+ for i in range(length):
+ byte = response[3 + i]
+ if i % 2 == 0:
+ address = self._read_start + i / 2
+ read[str(int(address))] = byte * 256
+ else:
+ address = self._read_start + (i - 1) / 2
+ read[str(int(address))] += byte
+ message['read'] = read
+ written = {}
+ for i in range(len(self._write_values)):
+ written[str(self._write_start + i)] = self._write_values[i]
+ message['written'] = written
+ return message
class ModbusMaster(BasePlugin):
In order to demonstrate this without accessing a real hardware serial
connection, we connect two pseudo ttys:
>>> import pty, os, controlpi
- >>> master_controller, master = pty.openpty()
- >>> slave_controller, slave = pty.openpty()
>>> def relay(source, target):
... content = os.read(source, 2147483647)
... os.write(target, content)
>>> def relay_setup():
+ ... master_controller, master = pty.openpty()
+ ... slave_controller, slave = pty.openpty()
... loop = asyncio.get_event_loop()
... loop.add_reader(master_controller, relay,
... master_controller, slave_controller)
... loop.add_reader(slave_controller, relay,
... slave_controller, master_controller)
-
-
+ ... return master, slave
>>> async def test():
- ... relay_setup()
+ ... master, slave = relay_setup()
... await controlpi.test(
... {"Test Master": {"plugin": "ModbusMaster",
- ... "device": os.ttyname(master)},
+ ... "device": os.ttyname(master),
+ ... "parity": "none"},
... "Test Slave": {"plugin": "ModbusSlave",
... "device": os.ttyname(slave),
+ ... "parity": "none",
... "slave": 1}},
- ... [{"target": "Test Master", "command": "read coils",
- ... "slave": 1, "start": 1, "quantity": 2},
- ... {"target": "Test Master", "command": "write single coil",
+ ... [{"target": "Test Master", "command": "write single coil",
... "slave": 1, "address": 1, "value": False},
... {"target": "Test Master", "command": "write single coil",
... "slave": 1, "address": 2, "value": True},
+ ... {"target": "Test Master", "command": "write multiple coils",
+ ... "slave": 1, "start": 3, "values": [False, True]},
... {"target": "Test Master", "command": "read coils",
- ... "slave": 1, "start": 1, "quantity": 2},
- ... {"target": "Test Master", "command": "read holding registers",
- ... "slave": 1, "start": 7, "quantity": 2},
- ... {"target": "Test Master", "command": "write single register",
- ... "slave": 1, "address": 7, "value": 42},
- ... {"target": "Test Master", "command": "write single register",
- ... "slave": 1, "address": 8, "value": 42042},
- ... {"target": "Test Master", "command": "read holding registers",
- ... "slave": 1, "start": 7, "quantity": 2}], 0.1)
+ ... "slave": 1, "start": 1, "quantity": 4}], 0.1)
>>> asyncio.run(test())
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
test(): {'sender': '', 'event': 'registered',
'client': 'Test Master', 'plugin': 'ModbusMaster', ...}
test(): {'sender': '', 'event': 'registered',
'client': 'Test Slave', 'plugin': 'ModbusSlave', ...}
- test(): {'sender': 'test()', 'target': 'Test Master',
- 'command': 'read coils', 'slave': 1,
- 'start': 1, 'quantity': 2}
test(): {'sender': 'test()', 'target': 'Test Master',
'command': 'write single coil', 'slave': 1,
'address': 1, 'value': False}
'command': 'write single coil', 'slave': 1,
'address': 2, 'value': True}
test(): {'sender': 'test()', 'target': 'Test Master',
- 'command': 'read coils', 'slave': 1,
- 'start': 1, 'quantity': 2}
+ 'command': 'write multiple coils', 'slave': 1,
+ 'start': 3, 'values': [False, True]}
test(): {'sender': 'test()', 'target': 'Test Master',
- 'command': 'read holding registers', 'slave': 1,
- 'start': 7, 'quantity': 2}
+ 'command': 'read coils', 'slave': 1,
+ 'start': 1, 'quantity': 4}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '010500000000cdca'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '010500000000cdca'}
+ test(): {'sender': 'Test Master', 'event': 'response',
+ 'slave': 1, 'function': 'write single coil',
+ 'address': 1, 'value': False}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '01050001ff00ddfa'}
+ test(): {'sender': 'Test Slave', 'event':
+ 'sent', 'message': '01050001ff00ddfa'}
+ test(): {'sender': 'Test Master', 'event': 'response',
+ 'slave': 1, 'function': 'write single coil',
+ 'address': 2, 'value': True}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '010f00020002020200e7da'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '010f0002000275ca'}
+ test(): {'sender': 'Test Master', 'event': 'response',
+ 'slave': 1, 'function': 'write multiple coils',
+ 'written': {'3': False, '4': True}}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '0101000000043dc9'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '0101010ad18f'}
+ test(): {'sender': 'Test Master', 'event': 'response',
+ 'slave': 1, 'function': 'read coils',
+ 'read': {'1': False, '2': True, '3': False, '4': True}}
+
+ >>> async def test():
+ ... master, slave = relay_setup()
+ ... await controlpi.test(
+ ... {"Test Master": {"plugin": "ModbusMaster",
+ ... "device": os.ttyname(master),
+ ... "parity": "none"},
+ ... "Test Slave": {"plugin": "ModbusSlave",
+ ... "device": os.ttyname(slave),
+ ... "parity": "none",
+ ... "slave": 1}},
+ ... [{"target": "Test Master", "command": "write single register",
+ ... "slave": 1, "address": 7, "value": 42},
+ ... {"target": "Test Master", "command": "write single register",
+ ... "slave": 1, "address": 8, "value": 42042},
+ ... {"target": "Test Master",
+ ... "command": "write multiple registers",
+ ... "slave": 1, "start": 9, "values": [42, 42042]},
+ ... {"target": "Test Master", "command": "read holding registers",
+ ... "slave": 1, "start": 7, "quantity": 4}], 0.1)
+ >>> asyncio.run(test())
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Master', 'plugin': 'ModbusMaster', ...}
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Slave', 'plugin': 'ModbusSlave', ...}
test(): {'sender': 'test()', 'target': 'Test Master',
'command': 'write single register', 'slave': 1,
'address': 7, 'value': 42}
test(): {'sender': 'test()', 'target': 'Test Master',
'command': 'write single register', 'slave': 1,
'address': 8, 'value': 42042}
+ test(): {'sender': 'test()', 'target': 'Test Master',
+ 'command': 'write multiple registers', 'slave': 1,
+ 'start': 9, 'values': [42, 42042]}
test(): {'sender': 'test()', 'target': 'Test Master',
'command': 'read holding registers', 'slave': 1,
- 'start': 7, 'quantity': 2}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '010100000002bdcb'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '018102c191'}
- test(): {'sender': 'Test Master', 'event': 'error',
- 'function': 'read coils', 'slave': 1,
- 'description': 'Illegal data address'}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '010500000000cdca'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '010500000000cdca'}
+ 'start': 7, 'quantity': 4}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '01060006002ae814'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '01060006002ae814'}
test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'write single coil', 'slave': 1, 'address': 1,
- 'value': False}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '01050001ff00ddfa'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '01050001ff00ddfa'}
- test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'write single coil', 'slave': 1, 'address': 2,
- 'value': True}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '010100000002bdcb'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '01010102d049'}
- test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'read coils', 'slave': 1,
- 'values': [False, True, False, False, False, False, False, False]}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '010300060002240a'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '018302c0f1'}
- test(): {'sender': 'Test Master', 'event': 'error',
- 'function': 'read holding registers', 'slave': 1,
- 'description': 'Illegal data address'}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '01060006002ae814'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '01060006002ae814'}
+ 'slave': 1, 'function': 'write single register',
+ 'address': 7, 'value': 42}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '01060007a43ac2d8'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '01060007a43ac2d8'}
test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'write single register', 'slave': 1, 'address': 7,
- 'value': 42}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '01060007a43ac2d8'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '01060007a43ac2d8'}
+ 'slave': 1, 'function': 'write single register',
+ 'address': 8, 'value': 42042}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '01100008000204002aa43a2912'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '011000080002c00a'}
test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'write single register', 'slave': 1, 'address': 8,
- 'value': 42042}
- test(): {'sender': 'Test Slave', 'event': 'received',
- 'message': '010300060002240a'}
- test(): {'sender': 'Test Slave', 'event': 'sent',
- 'message': '010304002aa43a2128'}
+ 'slave': 1, 'function': 'write multiple registers',
+ 'written': {'9': 42, '10': 42042}}
+ test(): {'sender': 'Test Slave',
+ 'event': 'received', 'message': '010300060004a408'}
+ test(): {'sender': 'Test Slave',
+ 'event': 'sent', 'message': '010308002aa43a002aa43a042f'}
test(): {'sender': 'Test Master', 'event': 'response',
- 'function': 'read holding registers', 'slave': 1,
- 'values': [42, 42042]}
+ 'slave': 1, 'function': 'read holding registers',
+ 'read': {'7': 42, '8': 42042, '9': 42, '10': 42042}}
"""
CONF_SCHEMA = {'properties':
self._stopbits = serial.STOPBITS_TWO
# Queue for Modbus messages to be sent:
self._queue: asyncio.Queue = asyncio.Queue()
- # Message templates sent and received by plugin client:
+ # Process all implemented Modbus functions:
+ self._modbus_functions = {}
sends = []
receives = []
- self._modbus_functions = {}
- # Error messages:
- sends.append(MessageTemplate(
- {'event': {'const': 'error'}}))
- # Process all implemented Modbus functions:
- for cls in ModbusFunction.__subclasses__():
+ classes: List[Type[ModbusFunction]] = ModbusFunction.__subclasses__()
+ for cls in classes:
self._modbus_functions[cls.FUNCTION_NAME] = cls
sends.append(cls.send_template())
receives.append(cls.receive_template(self.name))
+ # Error messages:
+ sends.append(MessageTemplate(
+ {'event': {'const': 'error'}}))
self.bus.register(self.name, 'ModbusMaster',
sends, receives, self._receive)
async def _receive(self, message: Message) -> None:
await self._queue.put(message)
- async def _process_response(self, modbus_message: bytes) -> None:
- if function_code == 0x01 or function_code == 0x03:
- length = modbus_message[2]
- if len(modbus_message) != length + 5:
- message['event'] = 'error'
- message['description'] = 'Modbus response has wrong length'
- message['message'] = modbus_message.hex()
- await self.bus.send(message)
- return
- if function_code == 0x01:
- bool_values = []
- for i in range(length):
- byte = modbus_message[3 + i]
- for bit in range(8):
- if byte & 2**bit:
- bool_values.append(True)
- else:
- bool_values.append(False)
- message['values'] = bool_values
- if function_code == 0x03:
- int_values = []
- for i in range(0, length, 2):
- int_values.append(modbus_message[3 + i]*256 +
- modbus_message[3 + i + 1])
- message['values'] = int_values
- if function_code == 0x05 or function_code == 0x06:
- if len(modbus_message) != 8:
- message['event'] = 'error'
- message['description'] = 'Modbus response has wrong length'
- message['message'] = modbus_message.hex()
- await self.bus.send(message)
- return
- message['address'] = modbus_message[2]*256 + modbus_message[3] + 1
- if function_code == 0x05:
- if modbus_message[4] == 0xFF and modbus_message[5] == 0x00:
- message['value'] = True
- elif modbus_message[4] == 0x00 and modbus_message[5] == 0x00:
- message['value'] = False
- else:
- message['event'] = 'error'
- message['description'] = 'Coil value not decodable'
- message['message'] = modbus_message.hex()
- await self.bus.send(message)
- return
- if function_code == 0x06:
- message['value'] = modbus_message[4] * 256 + modbus_message[5]
- await self.bus.send(message)
-
async def run(self) -> None:
"""Open serial device and start loop on modbus message queue."""
reader, writer = await serial_asyncio.open_serial_connection(
url=self.conf['device'], baudrate=self.conf['baudrate'],
parity=self._parity, stopbits=self._stopbits)
while True:
- # Initialise ModbusFunction instance from message:
message = await self._queue.get()
- cls = self._modbus_functions[message['command']]
- function = cls(message)
- request_pdu = function.get_request()
- request_pdu += bytes(CRC(request_pdu))
- # TODO: In specific classes:
- slave = request_pdu[0]
- function_code = request_pdu[1]
# Read bytes from serial (normally there shouldn't be any):
unexpected_pdu = b''
while True:
'message': unexpected_pdu.hex(),
'description':
"Unexpected message on Modbus."}))
+ # Initialise ModbusFunction instance from message:
+ cls = self._modbus_functions[message['command']]
+ function = cls(message)
+ request_pdu = function.get_request()
+ request_pdu += bytes(CRC(request_pdu))
# Retry self.conf['retries'] times:
tries = 0
while tries <= self.conf['retries']:
data_length = modbus_request[6]
if len(modbus_request) != 9 + data_length:
return bytes((slave, function_code | 0x80, 0x03))
- data = []
if function_code == 0x0F:
+ coil_data = []
for i in range(data_length):
for bit in range(8):
if modbus_request[7 + i] & 2**bit:
- data.append(True)
+ coil_data.append(True)
else:
- data.append(False)
+ coil_data.append(False)
+ if len(coil_data) < quantity:
+ return bytes((slave, function_code | 0x80, 0x03))
+ for i in range(quantity):
+ self._coils[start + i] = coil_data[i]
elif function_code == 0x10:
- hi = True
+ register_data = []
for i in range(data_length):
- if hi:
- data.append(modbus_request[7 + i]*256)
- hi = False
+ if i % 2 == 0:
+ register_data.append(modbus_request[7 + i] * 256)
else:
- data[-1] += modbus_request[7 + i]
- hi = True
- if len(data) < quantity:
- return bytes((slave, function_code | 0x80, 0x03))
- if function_code == 0x0F:
- for i in range(quantity):
- self._coils[start + i] = data[i]
- elif function_code == 0x10:
+ register_data[-1] += modbus_request[7 + i]
+ if len(register_data) != quantity:
+ return bytes((slave, function_code | 0x80, 0x03))
for i in range(quantity):
- self._registers[start + i] = data[i]
+ self._registers[start + i] = register_data[i]
return modbus_request[:6]
elif function_code == 0x17:
if len(modbus_request) < 13:
if len(modbus_request) != 13 + data_length:
return bytes((slave, function_code | 0x80, 0x03))
write_data = []
- hi = True
for i in range(data_length):
- if hi:
- write_data.append(modbus_request[11 + i]*256)
- hi = False
+ if i % 2 == 0:
+ write_data.append(modbus_request[11 + i] * 256)
else:
write_data[-1] += modbus_request[11 + i]
- hi = True
if len(write_data) != write_quantity:
return bytes((slave, function_code | 0x80, 0x03))
for i in range(write_quantity):