"""ControlPi Plugin for NFC Card Reader."""
import asyncio
+import time
-from smartcard.CardMonitoring import CardMonitor, CardObserver # type: ignore
+import smartcard.System # type: ignore
+import smartcard.Exceptions # type: ignore
from controlpi import BasePlugin, Message, MessageTemplate
from controlpi.baseplugin import JSONSchema
-class ControlPiCardObserver(CardObserver):
- """Card Observer that calls ControlPi plugin on changes."""
-
- def __init__(self, plugin, loop):
- """Register the plugin to be called back and the event loop."""
- self._plugin = plugin
- self._loop = loop
- super().__init__()
-
- def update(self, observable, handlers):
- """Get the ID of the NFC card and call back plugin."""
- added, removed = handlers
- for card in added:
- connection = card.createConnection()
- connection.connect()
- data, sw1, sw2 = connection.transmit([0xFF, 0xCA,
- 0x00, 0x00, 0x00])
- identifier = bytes(data).hex()
- self._loop.call_soon_thread_safe(
- self._plugin._set_state, True, identifier
- )
- for card in removed:
- self._loop.call_soon_thread_safe(
- self._plugin._set_state, False, ""
- )
-
-
class NFCReader(BasePlugin):
"""ControlPi plugin for NFC card reader."""
'state': state,
'card': card}))
- def _monitor_card(self, loop) -> None:
- self._monitor = CardMonitor()
- self._observer = ControlPiCardObserver(self, loop)
- self._monitor.addObserver(self._observer)
+ def _poll_reader(self, loop) -> None:
+ while loop.is_running():
+ card = ""
+ loop.call_soon_threadsafe(
+ self._set_state, False, "")
+ readers = smartcard.System.readers()
+ while not readers and loop.is_running():
+ time.sleep(10)
+ readers = smartcard.System.readers()
+ reader = readers[0]
+ connection = reader.createConnection()
+ while loop.is_running():
+ time.sleep(0.5)
+ try:
+ connection.connect()
+ data, sw1, sw2 = connection.transmit([0xFF, 0xCA, 0x00,
+ 0x00, 0x00])
+ identifier = bytes(data).hex()
+ if identifier != card:
+ if card:
+ loop.call_soon_threadsafe(
+ self._set_state, False, "")
+ card = identifier
+ if card:
+ loop.call_soon_threadsafe(
+ self._set_state, True, card)
+ except smartcard.Exceptions.NoCardException:
+ if card:
+ card = ""
+ loop.call_soon_threadsafe(
+ self._set_state, False, "")
+ except smartcard.Exceptions.CardConnectionException:
+ if card:
+ card = ""
+ loop.call_soon_threadsafe(
+ self._set_state, False, "")
+ break
async def run(self) -> None:
"""Run no code proactively."""
loop = asyncio.get_running_loop()
- loop.run_in_executor(None, self._monitor_card, loop)
+ loop.run_in_executor(None, self._poll_reader, loop)