class UniPi(BasePlugin):
CONF_SCHEMA = {}
+ SYS_DIR = '/sys/bus/platform/devices/unipi_plc'
def process_conf(self) -> None:
"""Search all input and output pins and initialise."""
self._paths: Dict[str, str] = {}
self._states: Dict[str, bool] = {}
self._counters: Dict[str, int] = {}
- for path in glob.glob('/sys/bus/platform/devices/unipi_plc/io_group?/di_?_??'):
+ for path in glob.glob(self.SYS_DIR + '/io_group?/di_?_??'):
client = path[46:53]
self._paths[client] = path
with open(f"{path}/{client[0:2]}_value", 'r') as sysfs_file:
([MessageTemplate({'target':
{'const': client},
'command':
- {'const': 'reset counter'}})],
+ {'const':
+ 'reset counter'}})],
self._reset_counter)])
- for path in glob.glob('/sys/bus/platform/devices/unipi_plc/io_group?/do_?_??'):
+ for path in (glob.glob(self.SYS_DIR + '/io_group?/do_?_??') +
+ glob.glob(self.SYS_DIR + '/io_group?/ro_?_??')):
client = path[46:53]
self._paths[client] = path
with open(f"{path}/{client[0:2]}_value", 'r') as sysfs_file:
# Get state from sysfs file:
old_state = self._states[client]
new_state = False
- async with aiofiles.open(f"{path}/{client[0:2]}_value", 'r') as sysfs_file:
+ async with aiofiles.open(f"{path}/{client[0:2]}_value",
+ 'r') as sysfs_file:
file_state = await sysfs_file.read()
if file_state[0] == '1':
new_state = True
client = message['target']
path = self._paths[client]
# Write new state to sysfs file:
- async with aiofiles.open(f"{path}/{client[0:2]}_value", 'w') as sysfs_file:
+ async with aiofiles.open(f"{path}/{client[0:2]}_value",
+ 'w') as sysfs_file:
if message['new state']:
await sysfs_file.write('1')
else:
# Get state from sysfs file:
old_state = self._states[client]
new_state = False
- async with aiofiles.open(f"{path}/{client[0:2]}_value", 'r') as sysfs_file:
+ async with aiofiles.open(f"{path}/{client[0:2]}_value",
+ 'r') as sysfs_file:
file_state = await sysfs_file.read()
if file_state[0] == '1':
new_state = True
# Get state from sysfs file:
old_state = self._states[client]
new_state = False
- async with aiofiles.open(f"{path}/{client[0:2]}_value", 'r') as sysfs_file:
+ async with aiofiles.open(f"{path}/{client[0:2]}_value",
+ 'r') as sysfs_file:
file_state = await sysfs_file.read()
if file_state[0] == '1':
new_state = True
# Get counter from sysfs file:
old_counter = self._counters[client]
new_counter = 0
- async with aiofiles.open(f"{path}/counter", 'r') as sysfs_file:
+ async with aiofiles.open(f"{path}/counter",
+ 'r') as sysfs_file:
file_counter = await sysfs_file.read()
new_counter = int(file_counter)
self._counters[client] = new_counter
if old_counter != new_counter:
await self.bus.send(Message(client, {'event': 'changed',
- 'counter': new_counter}))
+ 'counter':
+ new_counter}))