+import functools
+import asyncio
import serialio
from gpin import PCF8574Output, PCF8574Input, GPIOInputPin
from gmodbus.transport import SerialClient
from gmodbus.hitachi import SJP1Fu
-async def process_configuration(conf, queues):
+async def pin_handler(queues, pin_name, pin):
+ in_queue = asyncio.Queue()
+ queues.append(in_queue)
+ while True:
+ event = await in_queue.get()
+ if event['name'] == 'setpin' and event['pin'] == pin_name:
+ pin.value = event['value']
+ elif event['name'] == 'getpin' and event['pin'] == pin_name:
+ for out_queue in queues:
+ await out_queue.put({'name': 'pinstate', 'pin': pin_name,
+ 'value': pin.value, 'changed': False})
- def callback_factory(pin_name):
- settable = pins[pin_name].settable
- def callback(value):
- for queue in queues:
- queue.put_nowait({'event': 'pinstate', 'pin': pin_name,
- 'settable': settable, 'value': value,
- 'changed': True})
- return callback
- pins = {}
+def pin_callback(queues, pin_name, value):
+ for queue in queues:
+ queue.put_nowait({'name': 'pinstate', 'pin': pin_name,
+ 'value': value, 'changed': True})
+
+
+async def process_card_conf(card_conf, queues):
+ tasks = []
+ card = None
+ if card_conf['type'] == 'output':
+ card = PCF8574Output(card_conf['address'])
+ elif card_conf['type'] == 'input':
+ card = PCF8574Input(card_conf['address'],
+ GPIOInputPin(card_conf['interrupt pin'],
+ up=True))
+ if card is not None:
+ for i in range(8):
+ pin = card.getPin(i)
+ pin_names = card_conf['pins'][i]
+ for pin_name in pin_names:
+ pin.on('change', functools.partial(pin_callback, queues, pin_name))
+ tasks.append(asyncio.create_task(pin_handler(queues, pin_name, pin)))
+ return tasks
+
+
+async def fu_handler(queues):
+ in_queue = asyncio.Queue()
+ queues.append(in_queue)
+ while True:
+ event = await in_queue.get()
+ if event['name'] == 'setparameters':
+ await fu.set_parameters()
+ elif event['name'] == 'setfrequency':
+ await fu.set_frequency(event['frequency'])
+ elif event['name'] == 'getfrequency':
+ frequency = await fu.get_frequency()
+ for out_queue in queues:
+ await out_queue.put({'name': 'frequency', 'frequency': frequency})
+ elif event['name'] == 'startinverter':
+ await fu.start_inverter()
+ elif event['name'] == 'stopinverter':
+ await fu.stop_inverter()
+ elif event['name'] == 'getinverter':
+ active = await fu.inverter_active
+ for out_queue in queues:
+ await out_queue.put({'name': 'inverterstate', 'active': active})
+
+
+async def process_modbus_conf(modbus_conf, queues):
+ #port = SerialPort(modbus_conf['serial device'])
+ port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}")
+ await port.open()
+ client = SerialClient(port, modbus_conf['slave id'])
+ fu = SJP1Fu(client)
+ return asyncio.create_task(fu_handler(queues))
+
+
+async def process_conf(conf, queues):
+ tasks = []
if 'i/o cards' in conf:
for card_conf in conf['i/o cards']:
- card = None
- if card_conf['type'] == 'output':
- card = PCF8574Output(card_conf['address'])
- elif card_conf['type'] == 'input':
- card = PCF8574Input(card_conf['address'],
- GPIOInputPin(card_conf['interrupt pin'],
- up=True))
- if card is not None:
- for i in range(8):
- pin = card.getPin(i)
- pin_names = card_conf['pins'][i]
- for pin_name in pin_names:
- pins[pin_name] = pin
- pin.on('change', callback_factory(pin_name))
+ tasks.extend(await process_card_conf(card_conf, queues))
pins['T1-1'].value = True
fu = None
if 'modbus' in conf:
- modbus_conf = conf['modbus']
- #port = SerialPort(modbus_conf['serial device'])
- port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}")
- await port.open()
- client = SerialClient(port, modbus_conf['slave id'])
- fu = SJP1Fu(client)
- return (pins, fu)
+ tasks.append(await process_modbus_conf(conf['modbus'], queues))
+ return tasks
import json
import asyncio
-from .config import process_configuration
+from .config import process_conf
from .websocket import setup_websocket
-async def setup():
- pins = {}
- fu = None
+async def main():
queues = []
with open(sys.argv[1]) as json_data:
conf = json.load(json_data)
- (pins, fu) = await process_configuration(conf, queues)
- await setup_websocket(pins, fu, queues, sys.argv[2])
+ tasks = await process_conf(conf, queues)
+ await setup_websocket(queues, sys.argv[2])
+ done, pending = await asyncio.wait(tasks)
+ for task in pending:
+ task.cancel()
if __name__ == '__main__':
- asyncio.get_event_loop().run_until_complete(setup())
try:
- asyncio.get_event_loop().run_forever()
+ asyncio.run(main())
except KeyboardInterrupt:
pass
from http import HTTPStatus
-async def process_command(command, pins, fu, queue):
- if command['command'] == 'setpin':
- if command['pin'] in pins and pins[command['pin']].settable:
- pins[command['pin']].value = command['value']
- elif command['command'] == 'getpin':
- if command['pin'] in pins:
- pin = pins[command['pin']]
- await queue.put({'event': 'pinstate', 'pin': command['pin'],
- 'settable': pin.settable, 'value': pin.value,
- 'changed': False})
- elif command['command'] == 'getallpins':
- for pin_name in pins:
- pin = pins[pin_name]
- await queue.put({'event': 'pinstate', 'pin': pin_name,
- 'settable': pin.settable, 'value': pin.value,
- 'changed': False})
- elif command['command'] == 'setparameters':
- pass
- #await fu.set_parameters()
- elif command['command'] == 'setfrequency':
- await fu.set_frequency(command['value'])
- elif command['command'] == 'getfrequency':
- frequency = await fu.get_frequency()
- await queue.put({'event': 'frequency', 'frequency': frequency})
- elif command['command'] == 'startinverter':
- await fu.start_inverter()
- elif command['command'] == 'stopinverter':
- await fu.stop_inverter()
- elif command['command'] == 'getinverter':
- active = await fu.inverter_active
- await queue.put({'event': 'inverterstate', 'active': active})
-
-
-async def command_handler(websocket, path, pins, fu, queue):
+async def command_handler(websocket, queues):
async for message in websocket:
command = json.loads(message)
- await process_command(command, pins, fu, queue)
+ for queue in queues:
+ await queue.put(command)
-async def event_handler(websocket, path, pins, fu, queue):
+async def event_handler(websocket, queues):
+ queue = asyncio.Queue()
+ queues.append(queue)
while True:
event = await queue.get()
message = json.dumps(event)
await websocket.send(message)
+ # To move:
if event['event'] == 'pinstate':
okay = ( pins['Zentralschmierung'].value and
pins['Ölpumpe'].value and
#await fu.set_parameters()
await fu.set_frequency(25)
await fu.start_inverter()
- await asyncio.sleep(1)
+ await asyncio.sleep(0.15)
await fu.stop_inverter()
else:
#await fu.set_parameters()
#await fu.set_parameters()
await fu.set_frequency(-25)
await fu.start_inverter()
- await asyncio.sleep(1)
+ await asyncio.sleep(0.15)
await fu.stop_inverter()
queue.task_done()
-async def handler(pins, fu, queues, websocket, path):
- queue = asyncio.Queue()
- queues.append(queue)
- command_task = asyncio.create_task(command_handler(websocket, path,
- pins, fu, queue))
- event_task = asyncio.create_task(event_handler(websocket, path,
- pins, fu, queue))
+async def handler(queues, websocket, path):
+ address = websocket.remote_address
+ print(f"Websocket to {address[0]}:{address[1]} opened.")
+ command_task = asyncio.create_task(command_handler(websocket, queues))
+ event_task = asyncio.create_task(event_handler(websocket, queues))
done, pending = await asyncio.wait(
[command_task, event_task],
- return_when=asyncio.FIRST_COMPLETED,
+ return_when=asyncio.FIRST_EXCEPTION,
)
+ for task in done:
+ if task.exception():
+ print(f"For websocket to {address[0]}:{address[1]}, "
+ f"task {task} got exception {task.exception()}")
for task in pending:
task.cancel()
queues.remove(queue)
+ print(f"Websocket to {address[0]}:{address[1]} closed.")
async def process_request(server_root, path, request_headers):
return ip
-async def setup_websocket(pins, fu, queues, server_root):
- parameterised_handler = functools.partial(handler, pins, fu, queues)
+async def setup_websocket(queues, server_root):
+ parameterised_handler = functools.partial(handler, queues)
parameterised_process_request = functools.partial(process_request, server_root)
hostname = await get_ip()
await websockets.serve(parameterised_handler, hostname, 80,
var ws = new WebSocket("ws://" + window.location.host)
ws.addEventListener('message', function (event) {
var data = JSON.parse(event.data)
- switch (data.event) {
+ switch (data.name) {
case 'pinstate':
var pin = document.getElementById(data.pin)
if (pin != null) {
}
})
ws.addEventListener('open', function (event) {
- ws.send(JSON.stringify({command: 'getpin', pin: 'Notaus'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Stop'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Tippen'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Schmierung'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Vorlauf'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Rücklauf'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Öllampe'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Zentralschmierung'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Ölpumpe'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Schwungrad'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Takt'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Öldruck'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte'}))
- ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte Einzug'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Notaus'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Stop'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Tippen'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Schmierung'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Vorlauf'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Rücklauf'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Öllampe'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Zentralschmierung'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Ölpumpe'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Schwungrad'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Takt'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Öldruck'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte'}))
+ ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte Einzug'}))
})
</script>
<h1>ControlPi</h1>