Switch from Observer to polling
authorBenjamin Braatz <bb@bbraatz.eu>
Mon, 22 Aug 2022 18:10:13 +0000 (20:10 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Mon, 22 Aug 2022 18:39:02 +0000 (20:39 +0200)
controlpi_plugins/nfc.py

index 62870c11517ea3cdc116c68f9be8b1e2c021a9c4..dbd601cdb146206ce70ab9c6e9d5ae55a0e808b5 100644 (file)
@@ -1,39 +1,14 @@
 """ControlPi Plugin for NFC Card Reader."""
 import asyncio
+import time
 
-from smartcard.CardMonitoring import CardMonitor, CardObserver  # type: ignore
+import smartcard.System  # type: ignore
+import smartcard.Exceptions  # type: ignore
 
 from controlpi import BasePlugin, Message, MessageTemplate
 from controlpi.baseplugin import JSONSchema
 
 
-class ControlPiCardObserver(CardObserver):
-    """Card Observer that calls ControlPi plugin on changes."""
-
-    def __init__(self, plugin, loop):
-        """Register the plugin to be called back and the event loop."""
-        self._plugin = plugin
-        self._loop = loop
-        super().__init__()
-
-    def update(self, observable, handlers):
-        """Get the ID of the NFC card and call back plugin."""
-        added, removed = handlers
-        for card in added:
-            connection = card.createConnection()
-            connection.connect()
-            data, sw1, sw2 = connection.transmit([0xFF, 0xCA,
-                                                  0x00, 0x00, 0x00])
-            identifier = bytes(data).hex()
-            self._loop.call_soon_thread_safe(
-                    self._plugin._set_state, True, identifier
-            )
-        for card in removed:
-            self._loop.call_soon_thread_safe(
-                    self._plugin._set_state, False, ""
-            )
-
-
 class NFCReader(BasePlugin):
     """ControlPi plugin for NFC card reader."""
 
@@ -77,12 +52,45 @@ class NFCReader(BasePlugin):
                                                      'state': state,
                                                      'card': card}))
 
-    def _monitor_card(self, loop) -> None:
-        self._monitor = CardMonitor()
-        self._observer = ControlPiCardObserver(self, loop)
-        self._monitor.addObserver(self._observer)
+    def _poll_reader(self, loop) -> None:
+        while loop.is_running():
+            card = ""
+            loop.call_soon_threadsafe(
+                    self._set_state, False, "")
+            readers = smartcard.System.readers()
+            while not readers and loop.is_running():
+                time.sleep(10)
+                readers = smartcard.System.readers()
+            reader = readers[0]
+            connection = reader.createConnection()
+            while loop.is_running():
+                time.sleep(0.5)
+                try:
+                    connection.connect()
+                    data, sw1, sw2 = connection.transmit([0xFF, 0xCA, 0x00,
+                                                          0x00, 0x00])
+                    identifier = bytes(data).hex()
+                    if identifier != card:
+                        if card:
+                            loop.call_soon_threadsafe(
+                                    self._set_state, False, "")
+                        card = identifier
+                        if card:
+                            loop.call_soon_threadsafe(
+                                    self._set_state, True, card)
+                except smartcard.Exceptions.NoCardException:
+                    if card:
+                        card = ""
+                        loop.call_soon_threadsafe(
+                                self._set_state, False, "")
+                except smartcard.Exceptions.CardConnectionException:
+                    if card:
+                        card = ""
+                        loop.call_soon_threadsafe(
+                                self._set_state, False, "")
+                    break
 
     async def run(self) -> None:
         """Run no code proactively."""
         loop = asyncio.get_running_loop()
-        loop.run_in_executor(None, self._monitor_card, loop)
+        loop.run_in_executor(None, self._poll_reader, loop)