self._stopbits = serial.STOPBITS_ONE
if self.conf['stopbits'] == 2:
self._stopbits = serial.STOPBITS_TWO
- # 1.5 char and 3.5 char times for protocol timing
- # (according to Modbus specification):
- self._t15 = 0.001000
- self._t35 = 0.001750
- if self.conf['baudrate'] <= 19200:
- bit_per_char = 9
- if self.conf['parity'] != 'none':
- bit_per_char += 1
- bit_per_char += self.conf['stopbits']
- seconds_per_char = bit_per_char / self.conf['baudrate']
- self._t15 = 2.0 * seconds_per_char
- self._t35 = 3.5 * seconds_per_char
# Queue for Modbus messages to be sent:
self._queue: asyncio.Queue = asyncio.Queue()
# Message templates sent and received by plugin client:
url=self.conf['device'], baudrate=self.conf['baudrate'],
parity=self._parity, stopbits=self._stopbits)
while True:
- modbus_request = await self._queue.get()
- slave = modbus_request[0]
+ request = await self._queue.get()
+ slave = request[0]
+ function_code = request[1]
+ # Read bytes from serial (normally there shouldn't be any):
+ unexpected_message = b''
+ while True:
+ try:
+ unexpected_message += await asyncio.wait_for(
+ reader.read(1), 0.001)
+ except asyncio.TimeoutError:
+ break
+ if unexpected_message:
+ message = Message(self.name)
+ message['event'] = 'error'
+ message['message'] = unexpected_message.hex()
+ message['description'] = "Unexpected message on Modbus."
+ await self.bus.send(message)
+ # Retry self.conf['retries'] times:
tries = 0
while tries <= self.conf['retries']:
tries += 1
- writer.write(modbus_request)
+ writer.write(request)
if slave == 0:
# Broadcast => just wait for delay and finish:
await asyncio.sleep(self.conf['turnaround delay'])
break
- modbus_response = b''
+ # Read response:
+ response = b''
crc = CRC()
- t35_task: asyncio.Task = asyncio.create_task(
- asyncio.sleep(self._t35))
- try:
- first_byte = await asyncio.wait_for(
- reader.read(1), self.conf['response timeout'])
- modbus_response += first_byte
- crc.update(first_byte[0])
- except asyncio.TimeoutError:
- message = Message(self.name)
- message['event'] = 'error'
- message['description'] = (
- f"Response time out on try {tries}.")
- await self.bus.send(message)
- continue
+ expected_length = 5
+ length = 0
while True:
- t35_task = asyncio.create_task(asyncio.sleep(self._t35))
try:
- next_byte = await asyncio.wait_for(
- reader.read(1), self._t15)
- modbus_response += next_byte
- crc.update(next_byte[0])
+ response += await asyncio.wait_for(
+ reader.read(1), self.conf['response timeout'])
+ crc.update(response[length])
except asyncio.TimeoutError:
break
+ length += 1
+ # Update expected length based on function:
+ if (length == 2 and
+ (response[1] == 0x05 or response[1] == 0x06 or
+ response[1] == 0x08 or response[1] == 0x0F or
+ response[1] == 0x10)):
+ expected_length = 8
+ if (length == 3 and
+ (response[1] == 0x01 or response[1] == 0x03 or
+ response[1] == 0x17)):
+ expected_length = response[2] + 5
+ # Break at expected length:
+ if length == expected_length:
+ break
+ # Process possible errors leading to retry:
+ message = Message(self.name)
+ message['event'] = 'error'
+ message['message'] = response.hex()
+ if length < expected_length:
+ message['description'] = (
+ f"Response timed out on try {tries}.")
+ await self.bus.send(message)
+ continue
if not crc:
- message = Message(self.name)
- message['event'] = 'error'
message['description'] = (
f"Response CRC failed on try {tries}.")
- message['message'] = modbus_response.hex()
await self.bus.send(message)
continue
- if modbus_response[0] != slave:
- message = Message(self.name)
- message['event'] = 'error'
+ if response[0] != slave:
message['description'] = (
f"Response from wrong slave on try {tries}.")
- message['message'] = modbus_response.hex()
await self.bus.send(message)
continue
- await self._process_response(modbus_response)
- await t35_task
+ if response[1] & 0x7F != function_code:
+ message['description'] = (
+ f"Response for wrong function on try {tries}.")
+ await self.bus.send(message)
+ continue
+ # Process response from correct slave for correct funtion
+ # with valid CRC:
+ await self._process_response(response)
break
self._queue.task_done()
self._stopbits = serial.STOPBITS_ONE
if 'stopbits' in self.conf and self.conf['stopbits'] == 2:
self._stopbits = serial.STOPBITS_TWO
- # 1.5 char and 3.5 char times for protocol timing
- # (according to Modbus specification):
- self._t15 = 0.001000
- self._t35 = 0.001750
- if self._baudrate <= 19200:
- bit_per_char = 11
- if (self._parity == serial.PARITY_NONE and
- self._stopbits == serial.STOPBITS_ONE):
- bit_per_char = 10
- seconds_per_char = bit_per_char / self._baudrate
- self._t15 = 2.0 * seconds_per_char
- self._t35 = 3.5 * seconds_per_char
# Coils and registers:
self._coils: Dict[int, bool] = {}
self._registers: Dict[int, int] = {}
url=self._device, baudrate=self._baudrate,
parity=self._parity, stopbits=self._stopbits)
while True:
- modbus_request = b''
+ # Read request:
+ request = b''
crc = CRC()
- first_byte = await reader.read(1)
- t35_task: asyncio.Task = asyncio.create_task(
- asyncio.sleep(self._t35))
- modbus_request += first_byte
- crc.update(first_byte[0])
+ expected_length = 8
+ length = 0
while True:
- try:
- next_byte = await asyncio.wait_for(
- reader.read(1), self._t15)
- t35_task = asyncio.create_task(asyncio.sleep(self._t35))
- modbus_request += next_byte
- crc.update(next_byte[0])
- except asyncio.TimeoutError:
+ request += await reader.read(1)
+ crc.update(request[length])
+ length += 1
+ # Update expected length based on function:
+ if length == 2:
+ if request[1] == 0x0F or request[1] == 0x10:
+ expected_length = 9
+ if request[1] == 0x17:
+ expected_length = 13
+ if length == 7 and (request[1] == 0x0F or request[1] == 0x10):
+ expected_length += request[6]
+ if length == 11 and request[1] == 0x17:
+ expected_length += request[10]
+ if length == expected_length:
break
if not crc:
await self.bus.send(Message(self.name, {'event': 'crc error',
- 'message': modbus_request.hex()}))
+ 'message': request.hex()}))
continue
await self.bus.send(Message(self.name, {'event': 'received',
- 'message': modbus_request.hex()}))
- modbus_response = await self._process_request(modbus_request)
- if modbus_response:
- modbus_response += bytes(CRC(modbus_response))
- writer.write(modbus_response)
- await t35_task
+ 'message': request.hex()}))
+ response = await self._process_request(request)
+ if response:
+ response += bytes(CRC(response))
+ writer.write(response)
await self.bus.send(Message(self.name, {'event': 'sent',
- 'message': modbus_response.hex()}))
+ 'message': response.hex()}))