"""Modbus implementation.
…
-
-TODO: documentation, doctests
"""
import asyncio
import serial # type: ignore
class ModbusMaster(BasePlugin):
"""Modbus-RTU master plugin.
+ The plugin receives commands for executing Modbus functions on the
+ ControlPi message bus, encodes and sends them on the Modbus and
+ translates the responses back to ControlPi messages.
+
+ In order to demonstrate this without accessing a real hardware serial
+ connection, we connect two pseudo ttys:
>>> import pty, os, controlpi
>>> master_controller, master = pty.openpty()
>>> slave_controller, slave = pty.openpty()
>>> def relay(source, target):
... content = os.read(source, 2147483647)
... os.write(target, content)
- >>> async def test():
+ >>> def relay_setup():
... loop = asyncio.get_event_loop()
... loop.add_reader(master_controller, relay,
... master_controller, slave_controller)
... loop.add_reader(slave_controller, relay,
... slave_controller, master_controller)
+
+
+
+ >>> async def test():
+ ... relay_setup()
... await controlpi.test(
... {"Test Master": {"plugin": "ModbusMaster",
- ... "device": os.ttyname(master),
- ... "parity": "none"},
+ ... "device": os.ttyname(master)},
... "Test Slave": {"plugin": "ModbusSlave",
... "device": os.ttyname(slave),
- ... "slave": 1,
- ... "parity": "none"}},
+ ... "slave": 1}},
... [{"target": "Test Master", "command": "read coils",
... "slave": 1, "start": 1, "quantity": 2},
... {"target": "Test Master", "command": "write single coil",
test(): {'sender': '', 'event': 'registered',
'client': 'Test Slave', 'plugin': 'ModbusSlave',
'sends': [{'event': {'const': 'received'}},
+ {'event': {'const': 'crc error'}},
{'event': {'const': 'sent'}}],
'receives': []}
test(): {'sender': 'test()', 'target': 'Test Master',
if function_code == 0x03:
int_values = []
for i in range(0, length, 2):
- byte_hi = modbus_message[3 + i]
- byte_lo = modbus_message[4 + i]
- int_values.append(byte_hi * 256 + byte_lo)
+ int_values.append(modbus_message[3 + i]*256 +
+ modbus_message[3 + i + 1])
message['values'] = int_values
if function_code == 0x05 or function_code == 0x06:
if len(modbus_message) != 8:
message['message'] = modbus_message.hex()
await self.bus.send(message)
return
- address_hi = modbus_message[2]
- address_lo = modbus_message[3]
- message['address'] = address_hi * 256 + address_lo + 1
+ message['address'] = modbus_message[2]*256 + modbus_message[3] + 1
if function_code == 0x05:
if modbus_message[4] == 0xFF and modbus_message[5] == 0x00:
message['value'] = True
await self.bus.send(message)
return
if function_code == 0x06:
- value_hi = modbus_message[4]
- value_lo = modbus_message[5]
- message['value'] = value_hi * 256 + value_lo
+ message['value'] = modbus_message[4] * 256 + modbus_message[5]
await self.bus.send(message)
async def run(self) -> None:
def process_conf(self) -> None:
"""Process configuration and register plugin as bus client."""
# Default values:
- if 'baudrate' not in self.conf:
- self.conf['baudrate'] = 9600
- if 'parity' not in self.conf:
- self.conf['parity'] = 'even'
- if 'stopbits' not in self.conf:
- self.conf['stopbits'] = 1
- # Constants from serial for parity and stop bits:
+ self._device = self.conf['device']
+ self._slave = self.conf['slave']
+ self._baudrate = 9600
+ if 'baudrate' in self.conf:
+ self._baudrate = self.conf['baudrate']
self._parity = serial.PARITY_EVEN
- if self.conf['parity'] == 'odd':
- self._parity = serial.PARITY_ODD
- elif self.conf['parity'] == 'none':
- self._parity = serial.PARITY_NONE
+ if 'parity' in self.conf:
+ if self.conf['parity'] == 'odd':
+ self._parity = serial.PARITY_ODD
+ elif self.conf['parity'] == 'none':
+ self._parity = serial.PARITY_NONE
self._stopbits = serial.STOPBITS_ONE
- if self.conf['stopbits'] == 2:
+ if 'stopbits' in self.conf and self.conf['stopbits'] == 2:
self._stopbits = serial.STOPBITS_TWO
# 1.5 char and 3.5 char times for protocol timing
# (according to Modbus specification):
self._t15 = 0.000750
self._t35 = 0.001750
- if self.conf['baudrate'] <= 19200:
- bit_per_char = 9
- if self.conf['parity'] != 'none':
- bit_per_char += 1
- bit_per_char += self.conf['stopbits']
- seconds_per_char = bit_per_char / self.conf['baudrate']
+ if self._baudrate <= 19200:
+ bit_per_char = 11
+ if (self._parity == serial.PARITY_NONE and
+ self._stopbits == serial.STOPBITS_ONE):
+ bit_per_char = 10
+ seconds_per_char = bit_per_char / self._baudrate
self._t15 = 1.5 * seconds_per_char
self._t35 = 3.5 * seconds_per_char
# Coils and registers:
self._registers: Dict[int, int] = {}
# Message templates sent by plugin client:
sends = [MessageTemplate({'event': {'const': 'received'}}),
+ MessageTemplate({'event': {'const': 'crc error'}}),
MessageTemplate({'event': {'const': 'sent'}})]
self.bus.register(self.name, 'ModbusSlave', sends, [], self._receive)
pass
async def _process_request(self, modbus_request: bytes) -> bytes:
- message = Message(self.name)
- message['event'] = 'received'
- message['message'] = modbus_request.hex()
- await self.bus.send(message)
if len(modbus_request) < 4:
return b''
+ if modbus_request[0] != self._slave:
+ return b''
+ slave = modbus_request[0]
function_code = modbus_request[1]
- modbus_response = bytes([self.conf['slave']])
if function_code == 0x01 or function_code == 0x03:
if len(modbus_request) != 8:
- modbus_response += bytes([function_code | 0x80, 0x03])
- else:
- start_hi = modbus_request[2]
- start_lo = modbus_request[3]
- start = start_hi * 256 + start_lo
- quantity_hi = modbus_request[4]
- quantity_lo = modbus_request[5]
- quantity = quantity_hi * 256 + quantity_lo
- if function_code == 0x01:
- okay = True
- data = []
- byte = 0
- bit = 0
- for i in range(quantity):
- if start + i not in self._coils:
- modbus_response += bytes([0x81, 0x02])
- okay = False
- break
- if self._coils[start + i]:
- byte += 2**bit
- bit += 1
- if bit == 8:
- data.append(byte)
- byte = 0
- bit = 0
- if okay:
- if bit > 0:
- data.append(byte)
- modbus_response += bytes([0x01, len(data)])
- modbus_response += bytes(data)
- if function_code == 0x03:
- okay = True
- data = []
- for i in range(quantity):
- if start + i not in self._registers:
- modbus_response += bytes([0x83, 0x02])
- okay = False
- break
- data.append(self._registers[start + i] >> 8)
- data.append(self._registers[start + i] & 0xFF)
- if okay:
- modbus_response += bytes([0x03, len(data)])
- modbus_response += bytes(data)
+ return bytes((slave, function_code | 0x80, 0x03))
+ start = modbus_request[2]*256 + modbus_request[3]
+ quantity = modbus_request[4]*256 + modbus_request[5]
+ data = []
+ if function_code == 0x01:
+ byte = 0
+ bit = 0
+ for i in range(quantity):
+ if start + i not in self._coils:
+ return bytes((slave, 0x81, 0x02))
+ if self._coils[start + i]:
+ byte += 2**bit
+ bit += 1
+ if bit == 8:
+ data.append(byte)
+ byte = 0
+ bit = 0
+ if bit > 0:
+ data.append(byte)
+ elif function_code == 0x03:
+ for i in range(quantity):
+ if start + i not in self._registers:
+ return bytes((slave, 0x83, 0x02))
+ data.append(self._registers[start + i] >> 8)
+ data.append(self._registers[start + i] & 0xFF)
+ return bytes([slave, function_code, len(data)] + data)
elif function_code == 0x05 or function_code == 0x06:
if len(modbus_request) != 8:
- modbus_response += bytes([function_code | 0x80, 0x03])
- else:
- address_hi = modbus_request[2]
- address_lo = modbus_request[3]
- address = address_hi * 256 + address_lo
- if function_code == 0x05:
- okay = True
- if (modbus_request[4] == 0xFF and
- modbus_request[5] == 0x00):
- self._coils[address] = True
- elif (modbus_request[4] == 0x00 and
- modbus_request[5] == 0x00):
- self._coils[address] = False
- else:
- modbus_response += bytes([0x85, 0x03])
- okay = False
- if okay:
- modbus_response = modbus_request[:-2]
- if function_code == 0x06:
- value_hi = modbus_request[4]
- value_lo = modbus_request[5]
- value = value_hi * 256 + value_lo
- self._registers[address] = value
- modbus_response = modbus_request[:-2]
- else:
- modbus_response += bytes([function_code | 0x80, 0x01])
- modbus_response += bytes(CRC(modbus_response))
- message = Message(self.name)
- message['event'] = 'sent'
- message['message'] = modbus_response.hex()
- await self.bus.send(message)
- return modbus_response
+ return bytes((slave, function_code | 0x80, 0x03))
+ address = modbus_request[2]*256 + modbus_request[3]
+ if function_code == 0x05:
+ if modbus_request[4] == 0xFF and modbus_request[5] == 0x00:
+ self._coils[address] = True
+ elif modbus_request[4] == 0x00 and modbus_request[5] == 0x00:
+ self._coils[address] = False
+ else:
+ return bytes((slave, 0x81, 0x03))
+ elif function_code == 0x06:
+ value = modbus_request[4]*256 + modbus_request[5]
+ self._registers[address] = value
+ return modbus_request[:-2]
+ return bytes((slave, function_code | 0x80, 0x01))
async def run(self) -> None:
"""Open serial device and start loop on its reader."""
reader, writer = await serial_asyncio.open_serial_connection(
- url=self.conf['device'], baudrate=self.conf['baudrate'],
+ url=self._device, baudrate=self._baudrate,
parity=self._parity, stopbits=self._stopbits)
while True:
modbus_request = b''
crc = CRC()
+ first_byte = await reader.read(1)
t35_task: asyncio.Task = asyncio.create_task(
asyncio.sleep(self._t35))
- # First byte is read without timeout:
- first_byte = await reader.read(1)
modbus_request += first_byte
crc.update(first_byte[0])
while True:
- t35_task = asyncio.create_task(asyncio.sleep(self._t35))
try:
- # Subsequent bytes are only awaited for the time
- # needed to transmit 1.5 characters:
next_byte = await asyncio.wait_for(
reader.read(1), self._t15)
+ t35_task = asyncio.create_task(asyncio.sleep(self._t35))
modbus_request += next_byte
crc.update(next_byte[0])
except asyncio.TimeoutError:
break
if not crc:
+ await self.bus.send(Message(self.name, {'event': 'crc error',
+ 'message': modbus_request.hex()}))
continue
- if modbus_request[0] != self.conf['slave']:
- continue
+ await self.bus.send(Message(self.name, {'event': 'received',
+ 'message': modbus_request.hex()}))
modbus_response = await self._process_request(modbus_request)
- await t35_task
if modbus_response:
+ modbus_response += bytes(CRC(modbus_response))
writer.write(modbus_response)
+ await t35_task
+ await self.bus.send(Message(self.name, {'event': 'sent',
+ 'message': modbus_response.hex()}))