An EventCache provides a cache for locally sourced events on a supcon bus
'''
-import os
import asyncio
-
-from typing import Sequence, Callable
-
-import msgpack
+import functools
import supcon.intf
import supcon.util
+import supcon.store
EventCacheIntf = supcon.intf.DInterface.load({
'name': 'supcon.EventCache',
events on the bus.
'''
- def __init__(self, local: supcon.intf.Node, loadEvents: Callable, saveEvents: Callable):
+ def __init__(self, local: supcon.intf.Node, store: supcon.store.ValueStoreFactory, maxEvents = 1000):
supcon.intf.Implementation.__init__(self, EventCacheIntf)
self.__local = local
- self.__loadEvents = loadEvents
- self.__saveEvents = saveEvents
+ self.__store = store([])
+ self.__maxEvents = maxEvents
self.__watches = {}
- self.__events = self.__loadEvents()
self.setCallCb('events', self.__onEventsCall)
def __onEvent(self, args, event):
number = 0
- if self.__events:
- number = self.__events[-1][0] + 1
+ if self.__store.value:
+ number = self.__store.value[-1][0] + 1
_, path, intf, name = event
- self.__events.append((number, path, intf, name, args))
- self.__events = self.__saveEvents(self.__events)
+ def _updater(events):
+ events = events + ((number, path, intf, name, args),)
+ return events[-self.__maxEvents:]
+ self.__store.update(_updater)
self.fire('event', {'number': number, 'path': path, 'intf': intf, 'name': name, 'args': args})
- def __onEventsCall(self, next=None):
+ def __onEventsCall(self, next=0):
last = None
first = None
- if self.__events:
- last = self.__events[-1][0]
- first = self.__events[0][0]
+ if self.__store.value:
+ last = self.__store.value[-1][0]
+ first = self.__store.value[0][0]
index = next - first if next >= first else 0
- return {'last': last, 'first': first, 'events': self.__events[index:]}
+ return {'last': last, 'first': first, 'events': self.__store.value[index:]}
def MemoryLocalEventCache(local: supcon.intf.Node, maxEvents=1000) -> LocalEventCache:
Creates a LocalEventCache that stores the cached events in memory.
'''
- def _loadEvents() -> Sequence:
- return []
-
- def _saveEvents(events: Sequence) -> Sequence:
- if len(events) > maxEvents:
- events = events[-maxEvents:]
- return events
-
- return LocalEventCache(local, _loadEvents, _saveEvents)
+ return LocalEventCache(local, supcon.store.MemoryValueStore, maxEvents)
def MsgpackLocalEventCache(local: supcon.intf.Node, filename: str, maxEvents=1000) \
'''
Creates a LocalEventCache that stores the cached events in the filesystem.
'''
- def _loadEvents() -> Sequence:
- try:
- with open(filename, 'rb') as fp:
- return msgpack.unpackb(fp.read(), raw=False)
- except FileNotFoundError:
- return []
-
- def _saveEvents(events: Sequence) -> Sequence:
- if len(events) > maxEvents:
- events = events[-maxEvents:]
- with open(filename + '.wrt', 'wb') as fp:
- fp.write(msgpack.packb(events, use_bin_type=True))
- try:
- os.remove(filename)
- except FileNotFoundError:
- pass
- os.rename(filename + '.wrt', filename)
- return events
-
- return LocalEventCache(local, _loadEvents, _saveEvents)
+ access = supcon.store.SimpleFileAccess(filename)
+ store = functools.partial(supcon.store.FileValueStore, access)
+ return LocalEventCache(local, store, maxEvents)
class RemoteEventCache(supcon.util.EventEmitterMixin):
'''
def __init__(self, local: supcon.intf.Node, node: str, path: str, \
- loadNext: Callable, saveNext: Callable, loop: asyncio.AbstractEventLoop = None):
+ store: supcon.store.ValueStoreFactory, loop: asyncio.AbstractEventLoop = None):
self.__local = local
self.__node = node
self.__path = path
- self.__loadNext = loadNext
- self.__saveNext = saveNext
self.__loop = loop if loop else asyncio.get_event_loop()
self.__intfH = None
self.__eventH = None
- self.__next = self.__loadNext()
+ self.__store = store(0)
def setup(self):
''' Starts watching for events on the bus '''
self.__loop.create_task(self.__fetchEvents())
def __onEvent(self, args, _):
- if args['number'] == self.__next:
- self.__next += 1
- self.__saveNext(self.__next)
+ if args['number'] == self.__store.value:
+ self.__store.update((1).__add__)
self._emit((args['path'], args['intf'], args['name']), **args['args'])
else:
self.__loop.create_task(self.__fetchEvents())
async def __fetchEvents(self):
method = (self.__node, self.__path, EventCacheIntf.name, 'events')
- inArgs = {'next': self.__next}
+ inArgs = {'next': self.__store.value}
outArgs = await self.__local.call(*method, inArgs)
- if outArgs['first'] > self.__next:
- self._emit('missing', amount=outArgs['first'] - self.__next)
- self.__next = outArgs['first']
+ if outArgs['first'] > self.__store.value:
+ amount = outArgs['first'] - self.__store.value
+ self.__store.update((outArgs['first']).__add__(0))
+ self._emit('missing', amount=amount)
for number, path, intf, event, args in outArgs['events']:
- if number < self.__next:
+ if number < self.__store.value:
continue
- if number > self.__next:
+ if number > self.__store.value:
break
- self.__next += 1
- self.__saveNext(self.__next)
+ self.__store.update((1).__add__)
self._emit((path, intf, event), **args)
- if self.__next - 1 < outArgs['last']:
+ if self.__store.value - 1 < outArgs['last']:
self.__loop.create_task(self.__fetchEvents())
Creates a RemoteEventCache that stores the sequence number of the last observed event in memory.
'''
- def _loadNext() -> int:
- return 0
-
- def _saveNext(value: int):
- pass
-
- return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop)
+ return RemoteEventCache(local, node, path, supcon.store.MemoryValueStore, loop)
def MsgpackRemoteEventCache(local: supcon.intf.Node, node: str, path: str, filename: str,
filesystem.
'''
- def _loadNext() -> int:
- try:
- with open(filename, 'rb') as fp:
- return msgpack.unpackb(fp.read(), raw=False)
- except FileNotFoundError:
- return 0
-
- def _saveNext(value: int):
- with open(filename + '.wrt', 'wb') as fp:
- fp.write(msgpack.packb(value, use_bin_type=True))
- try:
- os.remove(filename)
- except FileNotFoundError:
- pass
- os.rename(filename + '.wrt', filename)
- return True
-
- return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop)
+ access = supcon.store.SimpleFileAccess(filename)
+ store = functools.partial(supcon.store.FileValueStore, access)
+
+ return RemoteEventCache(local, node, path, store, loop)
+"""
+The Module provides ABCs and classes to reliably store and access values on a storage medium.
+"""
-from typing import Iterable, Mapping, Any
+__all__ = (
+ 'FileFormatInterface', 'MessagePackFileFormat', 'FileAccessInterface', 'SimpleFileAccess',
+ 'EventualFileAccess', 'ValueStoreInterface', 'MemoryValueStore', 'FileValueStore',
+ 'ValueStoreFactory'
+)
+from typing import Callable, Mapping, Any
+
+import os
import abc
import asyncio
+import traceback
+
import msgpack
-class Store(abc.ABC):
- async def keys(self) -> Iterable[str]:
- raise NotImplementedError()
+class FileFormatInterface(abc.ABC):
+ '''
+ The ABC FileFormatInterface encodes and decodes arbitrary values into and from bytes objects.
+ Implementations of this ABC are used by the class SimpleFileAccess.
+ '''
- async def load(self, key: str) -> Any:
- return (await self.loadAll([key]))[key]
+ @abc.abstractmethod
+ def encode(self, value: Any) -> bytes:
+ '''Encodes the given value into a bytes object.'''
+ raise NotImplementedError()
- async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]:
+ @abc.abstractmethod
+ def decode(self, value: bytes) -> Any:
+ '''Decodes the given bytes object into a value.'''
raise NotImplementedError()
- async def save(self, key: str, val: Any) -> 'Store':
- return await self.saveAll({key: val})
- async def saveAll(self, vals: Mapping[str, Any]) -> 'Store':
+class MessagePackFileFormat(FileFormatInterface):
+ '''
+ The MessagePackFileFormat encodes and decodes arbitrary values into and from bytes objects
+ with the msgpack library
+ '''
+
+ def __init__(self, encodeOpts: Mapping = None, decodeOpts: Mapping = None):
+ '''
+ The given encodeOpts and decodeOpts get passed as keyword arguments to msgpack.packb and
+ msgpack.unpackb. The default encodeOpts value is {'use_bin_type': True}. The default
+ decodeOpts value is {'raw': False}
+ '''
+ self.__encodeOpts = encodeOpts if encodeOpts is not None else {
+ 'use_bin_type': True
+ }
+ self.__decodeOpts = decodeOpts if decodeOpts is not None else {
+ 'raw': False,
+ 'use_list': False
+ }
+
+ def encode(self, value: Any) -> bytes:
+ return msgpack.packb(value, **self.__encodeOpts)
+
+ def decode(self, value: bytes) -> Any:
+ return msgpack.unpackb(value, **self.__decodeOpts)
+
+
+class FileAccessInterface(abc.ABC):
+ '''
+ The ABC FileAccessInterface loads and saves arbitrary values from and to a storage medium.
+ '''
+
+ @abc.abstractmethod
+ def load(self) -> Any:
+ ''' Loads a value from a storage medium '''
raise NotImplementedError()
- async def clear(self, key: str) -> 'Store':
- return await self.clearAll([key])
-
- async def clearAll(self, keys: Iterable[str]) -> 'Store':
+ @abc.abstractmethod
+ def save(self, value: Any) -> None:
+ ''' Saves the given value to a storage medium '''
raise NotImplementedError()
-async def readFile(name: str) -> bytes:
- def inExecutor():
- with open(name, 'r+b') as fd:
- return fd.read()
+class SimpleFileAccess(FileAccessInterface):
+ '''
+ The class SimpleFileAccess loads and stores arbitrary values from and to the filesystem. It
+ stores first a temporary file and renames this file.
+ '''
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, inExecutor)
+ def __init__(self, filename: str, format: FileFormatInterface = None):
+ '''
+ Initializes the SimpleFileAccess instance. The filename denotes a location in the filesystem
+ to store values and format specifies the file format for encoding an decoding. If format is
+ not given the MessagePackFileFormat is used.
+ '''
+ self.__filename = filename
+ self.__format = format if format is not None else MessagePackFileFormat()
-async def writeFile(name: str, data: bytes):
- def inExecutor():
- with open(name, 'w+b') as fd:
- return fd.write(data)
+ def load(self) -> Any:
+ try:
+ with open(self.__filename + '.wrt', 'rb') as fp:
+ return self.__format.decode(fp.read())
+ except Exception:
+ with open(self.__filename, 'rb') as fp:
+ return self.__format.decode(fp.read())
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, inExecutor)
+ def save(self, value: Any) -> None:
+ with open(self.__filename + '.wrt', 'wb') as fp:
+ fp.write(self.__format.encode(value))
+ fp.flush()
+ os.fsync(fp.fileno())
+ os.rename(self.__filename + '.wrt', self.__filename)
+ fd = os.open(os.path.dirname(self.__filename), os.O_DIRECTORY)
+ os.fsync(fd)
+ os.close(fd)
-class FileStore(Store):
- def __init__(self, name):
- self.__name = name
+class EventualFileAccess(FileAccessInterface):
+ '''
+ Wraps an implementation of FileAccessInterface and stores the value eventually.
+ '''
- self.__data = None
- self.__lock = asyncio.Lock()
+ def __init__(self, access: FileAccessInterface, wait=15, loop: asyncio.AbstractEventLoop = None):
+ self.__access = access
+ self.__wait = wait
+ self.__loop = loop if loop is not None else asyncio.get_event_loop()
- @property
- def name(self):
- return self.__name
+ self.__task = False
+ self.__todo = False
+ self.__value = None
- async def __read(self):
- await self.__lock.acquire()
+ def load(self) -> Any:
+ return self.__value if self.__todo else self.__access.load()
- try:
- data = await readFile(self.__name)
- data = msgpack.unpackb(data, encoding='utf-8')
- self.__data = data
- except FileNotFoundError:
- self.__data = {}
- finally:
- self.__lock.release()
+ def save(self, value: Any) -> None:
+ self.__value = value
+ self.__todo = True
+
+ self.__loop.call_soon(self.__start)
+
+ def __start(self):
+ if self.__task or not self.__todo:
+ return
+
+ self.__task = True
+ self.__loop.call_soon(self.__store)
- async def __write(self):
- await self.__lock.acquire()
+ def __store(self):
+ if not self.__todo:
+ return
try:
- data = msgpack.packb(self.__data, use_bin_type=True)
- await writeFile(self.__name, data)
- finally:
- self.__lock.release()
+ self.__access.save(self.__value)
+ except Exception:
+ traceback.print_exc()
- async def keys(self):
- if self.__data is None:
- await self.__read()
+ self.__value = None
+ self.__todo = False
- return self.__data.keys()
+ self.__loop.call_later(self.__wait, self.__clear)
- async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]:
- if self.__data is None:
- await self.__read()
+ def __clear(self):
+ if not self.__task:
+ return
- vals = {}
- for key in keys:
- vals[key] = self.__data[key] if key in self.__data else None
- return vals
+ self.__task = False
+ if self.__todo:
+ self.__loop.call_soon(self.__start)
- async def saveAll(self, vals: Mapping[str, Any]) -> Store:
- if self.__data is None:
- await self.__read()
- for key, val in vals.items():
- self.__data[key] = val
+class ValueStoreInterface(abc.ABC):
+ """
+ The ABC ValueStoreInterface defines methods to store and update arbitrary values. A stored value
+ must never be mutated! Hint: Use immutable data types!
+ """
- await self.__write()
- return self
+ @abc.abstractproperty
+ def value(self) -> Any:
+ """
+ Returns the value stored by the ValueStore. The returned value must never be mutated! Hint: Use
+ immutable data types!
+ """
+ raise NotImplementedError()
+
+ @abc.abstractproperty
+ def update(self, updater: Callable[[Any], Any]) -> Any:
+ """
+ Updates the value stored by the ValueStore. The given updater callable gets called with the
+ current value in the ValueStore. The updater callable must construct and return a NEW updated
+ value. It MUST NOT mutate the given value! Hint: Use immutable data types! The ValueStore stores
+ and returns the new value.
+ """
+ raise NotImplementedError()
+
+
+class MemoryValueStore(ValueStoreInterface):
+ """
+ The MemoryValueStores stores the Value in the main memory.
+ """
+
+ def __init__(self, initial: Any = None):
+ self.__value = initial
+
+ @property
+ def value(self) -> Any:
+ return self.__value
+
+ def update(self, updater: Callable[[Any], Any]) -> Any:
+ self.__value = updater(self.__value)
+ return self.__value
+
+
+class FileValueStore(ValueStoreInterface):
+ """
+ The FileValueStore stores the value via the given FileAccessInterface
+ """
+
+ def __init__(self, access: FileAccessInterface, initial: Any = None):
+ if isinstance(access, str):
+ access = SimpleFileAccess(access)
+ self.__access = access
+
+ try:
+ self.__value = self.__access.load()
+ except Exception:
+ #traceback.print_exc()
+ self.__value = initial
+
+ @property
+ def value(self) -> Any:
+ return self.__value
- async def clearAll(self, keys: Iterable[str]) -> Store:
- if self.__data is None:
- await self.__read()
+ def update(self, updater: Callable[[Any], Any]) -> Any:
+ value = updater(self.__value)
+ self.__access.save(value)
+ self.__value = value
+ return value
- for key in keys:
- if key in self.__data:
- del self.__data[key]
- await self.__write()
- return self
\ No newline at end of file
+# A function that returns a ValueStore given some initial value
+ValueStoreFactory = Callable[[Any], ValueStoreInterface]
\ No newline at end of file
import os
import asyncio
+import tempfile
+import unittest
-from supcon.store import FileStore
+import msgpack
-fs = FileStore(os.path.dirname(__file__) + '/store.msg')
+import supcon.store
-async def testFileStore(fs):
- print(await fs.load('a'))
- await fs.save('a', 'abc')
- print(await fs.load('a'))
-loop = asyncio.get_event_loop()
-loop.run_until_complete(testFileStore(fs))
-loop.close()
+class TestMessagePackFileFormat(unittest.TestCase):
+
+ def test_encode(self):
+ value = 'lalala'
+
+ f = supcon.store.MessagePackFileFormat()
+ self.assertEqual(f.encode(value), msgpack.packb(value, use_bin_type=True))
+
+ def test_decode(self):
+ value = 'lalala'
+ value = msgpack.packb(value, use_bin_type=True)
+
+ f = supcon.store.MessagePackFileFormat()
+ self.assertEqual(f.decode(value), msgpack.unpackb(value, raw=False))
+
+
+class TestSimpleFileAccess(unittest.TestCase):
+
+ def test_saveload(self):
+ (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+ os.close(handle)
+
+ access = supcon.store.SimpleFileAccess(filename)
+
+ decoded = 'lalala'
+ encoded = msgpack.packb(decoded, use_bin_type=True)
+
+ access.save(decoded)
+ with open(filename, 'rb') as fp:
+ self.assertEqual(fp.read(), encoded)
+
+ self.assertEqual(access.load(), decoded)
+
+ os.remove(filename)
+
+
+class TestEventualFileAccess(unittest.TestCase):
+
+ def test_saveload(self):
+ (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+ os.close(handle)
+
+ simple = supcon.store.SimpleFileAccess(filename)
+ access = supcon.store.EventualFileAccess(simple, 1.5)
+
+ async def runIt():
+ access.save('random1')
+ self.assertEqual(access.load(), 'random1')
+
+ await asyncio.sleep(0.5) # first save
+
+ self.assertEqual(simple.load(), 'random1')
+
+ access.save('random2')
+ self.assertEqual(simple.load(), 'random1')
+ self.assertEqual(access.load(), 'random2')
+
+ await asyncio.sleep(0.5) # no save
+
+ self.assertEqual(simple.load(), 'random1')
+ self.assertEqual(access.load(), 'random2')
+
+ await asyncio.sleep(1) # second save
+
+ self.assertEqual(simple.load(), 'random2')
+ self.assertEqual(access.load(), 'random2')
+
+ await asyncio.sleep(1.5) # no save
+
+ self.assertEqual(simple.load(), 'random2')
+ self.assertEqual(access.load(), 'random2')
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(runIt())
+
+ os.remove(filename)
+
+
+class TestMemoryValueStore(unittest.TestCase):
+
+ def test_update(self):
+ store = supcon.store.MemoryValueStore(1)
+ self.assertEqual(1, store.value)
+
+ store.update(lambda v: v + 1)
+ self.assertEqual(2, store.value)
+
+ store.update(lambda v: v + 5)
+ self.assertEqual(7, store.value)
+
+ store.update(lambda v: -1)
+ self.assertEqual(-1, store.value)
+
+
+class TestFileValueStore(unittest.TestCase):
+
+ def test_update(self):
+ (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+ os.close(handle)
+
+ access = supcon.store.SimpleFileAccess(filename)
+ store = supcon.store.FileValueStore(access, 1)
+ self.assertEqual(1, store.value)
+
+ store.update(lambda v: v + 1)
+ self.assertEqual(2, store.value)
+ self.assertEqual(2, access.load())
+
+ store.update(lambda v: v + 5)
+ self.assertEqual(7, store.value)
+ self.assertEqual(7, access.load())
+
+ store.update(lambda v: -1)
+ self.assertEqual(-1, store.value)
+ self.assertEqual(-1, access.load())
+
+ os.remove(filename)
+
+
+if __name__ == '__main__':
+ unittest.main()