rewrote the store library 0.1.0-alpha.1
authorSebastian Brix <sebastian.brix@graph-it.com>
Mon, 16 Jul 2018 12:36:46 +0000 (14:36 +0200)
committerSebastian Brix <sebastian.brix@graph-it.com>
Mon, 16 Jul 2018 12:36:46 +0000 (14:36 +0200)
supcon/cache.py
supcon/store.py
tests/store.py

index 67783950d5a245d36f61afc04091ea4f8b8f04fc..d97a757a8e0e0db86a1e7042b62864a9a7a02191 100644 (file)
@@ -2,15 +2,12 @@
 An EventCache provides a cache for locally sourced events on a supcon bus
 '''
 
-import os
 import asyncio
-
-from typing import Sequence, Callable
-
-import msgpack
+import functools
 
 import supcon.intf
 import supcon.util
+import supcon.store
 
 EventCacheIntf = supcon.intf.DInterface.load({
   'name': 'supcon.EventCache',
@@ -43,15 +40,14 @@ class LocalEventCache(supcon.intf.Implementation):
   events on the bus.
   '''
 
-  def __init__(self, local: supcon.intf.Node, loadEvents: Callable, saveEvents: Callable):
+  def __init__(self, local: supcon.intf.Node, store: supcon.store.ValueStoreFactory, maxEvents = 1000):
     supcon.intf.Implementation.__init__(self, EventCacheIntf)
 
     self.__local = local
-    self.__loadEvents = loadEvents
-    self.__saveEvents = saveEvents
+    self.__store = store([])
+    self.__maxEvents = maxEvents
 
     self.__watches = {}
-    self.__events = self.__loadEvents()
 
     self.setCallCb('events', self.__onEventsCall)
 
@@ -74,24 +70,26 @@ class LocalEventCache(supcon.intf.Implementation):
 
   def __onEvent(self, args, event):
     number = 0
-    if self.__events:
-      number = self.__events[-1][0] + 1
+    if self.__store.value:
+      number = self.__store.value[-1][0] + 1
 
     _, path, intf, name = event
-    self.__events.append((number, path, intf, name, args))
-    self.__events = self.__saveEvents(self.__events)
+    def _updater(events):
+      events = events + ((number, path, intf, name, args),)
+      return events[-self.__maxEvents:]
+    self.__store.update(_updater)
 
     self.fire('event', {'number': number, 'path': path, 'intf': intf, 'name': name, 'args': args})
 
-  def __onEventsCall(self, next=None):
+  def __onEventsCall(self, next=0):
     last = None
     first = None
-    if self.__events:
-      last = self.__events[-1][0]
-      first = self.__events[0][0]
+    if self.__store.value:
+      last = self.__store.value[-1][0]
+      first = self.__store.value[0][0]
 
     index = next - first if next >= first else 0
-    return {'last': last, 'first': first, 'events': self.__events[index:]}
+    return {'last': last, 'first': first, 'events': self.__store.value[index:]}
 
 
 def MemoryLocalEventCache(local: supcon.intf.Node, maxEvents=1000) -> LocalEventCache:
@@ -99,15 +97,7 @@ def MemoryLocalEventCache(local: supcon.intf.Node, maxEvents=1000) -> LocalEvent
   Creates a LocalEventCache that stores the cached events in memory.
   '''
 
-  def _loadEvents() -> Sequence:
-    return []
-
-  def _saveEvents(events: Sequence) -> Sequence:
-    if len(events) > maxEvents:
-      events = events[-maxEvents:]
-    return events
-
-  return LocalEventCache(local, _loadEvents, _saveEvents)
+  return LocalEventCache(local, supcon.store.MemoryValueStore, maxEvents)
 
 
 def MsgpackLocalEventCache(local: supcon.intf.Node, filename: str, maxEvents=1000) \
@@ -115,27 +105,10 @@ def MsgpackLocalEventCache(local: supcon.intf.Node, filename: str, maxEvents=100
   '''
   Creates a LocalEventCache that stores the cached events in the filesystem.
   '''
-  def _loadEvents() -> Sequence:
-    try:
-      with open(filename, 'rb') as fp:
-        return msgpack.unpackb(fp.read(), raw=False)
-    except FileNotFoundError:
-      return []
-
-  def _saveEvents(events: Sequence) -> Sequence:
-    if len(events) > maxEvents:
-      events = events[-maxEvents:]
 
-    with open(filename + '.wrt', 'wb') as fp:
-      fp.write(msgpack.packb(events, use_bin_type=True))
-    try:
-      os.remove(filename)
-    except FileNotFoundError:
-      pass
-    os.rename(filename + '.wrt', filename)
-    return events
-
-  return LocalEventCache(local, _loadEvents, _saveEvents)
+  access = supcon.store.SimpleFileAccess(filename)
+  store = functools.partial(supcon.store.FileValueStore, access)
+  return LocalEventCache(local, store, maxEvents)
 
 
 class RemoteEventCache(supcon.util.EventEmitterMixin):
@@ -145,20 +118,18 @@ class RemoteEventCache(supcon.util.EventEmitterMixin):
   '''
 
   def __init__(self, local: supcon.intf.Node, node: str, path: str, \
-    loadNext: Callable, saveNext: Callable, loop: asyncio.AbstractEventLoop = None):
+    store: supcon.store.ValueStoreFactory, loop: asyncio.AbstractEventLoop = None):
 
     self.__local = local
     self.__node = node
     self.__path = path
-    self.__loadNext = loadNext
-    self.__saveNext = saveNext
 
     self.__loop = loop if loop else asyncio.get_event_loop()
 
     self.__intfH = None
     self.__eventH = None
 
-    self.__next = self.__loadNext()
+    self.__store = store(0)
 
   def setup(self):
     ''' Starts watching for events on the bus '''
@@ -186,33 +157,32 @@ class RemoteEventCache(supcon.util.EventEmitterMixin):
       self.__loop.create_task(self.__fetchEvents())
 
   def __onEvent(self, args, _):
-    if args['number'] == self.__next:
-      self.__next += 1
-      self.__saveNext(self.__next)
+    if args['number'] == self.__store.value:
+      self.__store.update((1).__add__)
       self._emit((args['path'], args['intf'], args['name']), **args['args'])
     else:
       self.__loop.create_task(self.__fetchEvents())
 
   async def __fetchEvents(self):
     method = (self.__node, self.__path, EventCacheIntf.name, 'events')
-    inArgs = {'next': self.__next}
+    inArgs = {'next': self.__store.value}
     outArgs = await self.__local.call(*method, inArgs)
 
-    if outArgs['first'] > self.__next:
-      self._emit('missing', amount=outArgs['first'] - self.__next)
-      self.__next = outArgs['first']
+    if outArgs['first'] > self.__store.value:
+      amount = outArgs['first'] - self.__store.value
+      self.__store.update((outArgs['first']).__add__(0))
+      self._emit('missing', amount=amount)
 
     for number, path, intf, event, args in outArgs['events']:
-      if number < self.__next:
+      if number < self.__store.value:
         continue
-      if number > self.__next:
+      if number > self.__store.value:
         break
 
-      self.__next += 1
-      self.__saveNext(self.__next)
+      self.__store.update((1).__add__)
       self._emit((path, intf, event), **args)
 
-    if self.__next - 1 < outArgs['last']:
+    if self.__store.value - 1 < outArgs['last']:
       self.__loop.create_task(self.__fetchEvents())
 
 
@@ -222,13 +192,7 @@ def MemoryRemoteEventCache(local: supcon.intf.Node, node: str, path: str,
   Creates a RemoteEventCache that stores the sequence number of the last observed event in memory.
   '''
 
-  def _loadNext() -> int:
-    return 0
-
-  def _saveNext(value: int):
-    pass
-
-  return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop)
+  return RemoteEventCache(local, node, path, supcon.store.MemoryValueStore, loop)
 
 
 def MsgpackRemoteEventCache(local: supcon.intf.Node, node: str, path: str, filename: str,
@@ -238,21 +202,7 @@ def MsgpackRemoteEventCache(local: supcon.intf.Node, node: str, path: str, filen
   filesystem.
   '''
 
-  def _loadNext() -> int:
-    try:
-      with open(filename, 'rb') as fp:
-        return msgpack.unpackb(fp.read(), raw=False)
-    except FileNotFoundError:
-      return 0
-
-  def _saveNext(value: int):
-    with open(filename + '.wrt', 'wb') as fp:
-      fp.write(msgpack.packb(value, use_bin_type=True))
-    try:
-      os.remove(filename)
-    except FileNotFoundError:
-      pass
-    os.rename(filename + '.wrt', filename)
-    return True
-
-  return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop)
+  access = supcon.store.SimpleFileAccess(filename)
+  store = functools.partial(supcon.store.FileValueStore, access)
+
+  return RemoteEventCache(local, node, path, store, loop)
index cbde0ae48f77fef9ead4326eb9f66a299350ab92..1179f8636a4080ba2382ed32ff25cf5dea3bf363 100644 (file)
+"""
+The Module provides ABCs and classes to reliably store and access values on a storage medium.
+"""
 
-from typing import Iterable, Mapping, Any
+__all__ = (
+  'FileFormatInterface', 'MessagePackFileFormat', 'FileAccessInterface', 'SimpleFileAccess',
+  'EventualFileAccess', 'ValueStoreInterface', 'MemoryValueStore', 'FileValueStore',
+  'ValueStoreFactory'
+)
 
+from typing import Callable, Mapping, Any
+
+import os
 import abc
 import asyncio
+import traceback
+
 import msgpack
 
-class Store(abc.ABC):
 
-  async def keys(self) -> Iterable[str]:
-    raise NotImplementedError()
+class FileFormatInterface(abc.ABC):
+  '''
+  The ABC FileFormatInterface encodes and decodes arbitrary values into and from bytes objects.
+  Implementations of this ABC are used by the class SimpleFileAccess.
+  '''
 
-  async def load(self, key: str) -> Any:
-    return (await self.loadAll([key]))[key]
+  @abc.abstractmethod
+  def encode(self, value: Any) -> bytes:
+    '''Encodes the given value into a bytes object.'''
+    raise NotImplementedError()
 
-  async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]:
+  @abc.abstractmethod
+  def decode(self, value: bytes) -> Any:
+    '''Decodes the given bytes object into a value.'''
     raise NotImplementedError()
 
-  async def save(self, key: str, val: Any) -> 'Store':
-    return await self.saveAll({key: val})
 
-  async def saveAll(self, vals: Mapping[str, Any]) -> 'Store':
+class MessagePackFileFormat(FileFormatInterface):
+  '''
+  The MessagePackFileFormat encodes and decodes arbitrary values into and from bytes objects
+  with the msgpack library
+  '''
+
+  def __init__(self, encodeOpts: Mapping = None, decodeOpts: Mapping = None):
+    '''
+    The given encodeOpts and decodeOpts get passed as keyword arguments to msgpack.packb and
+    msgpack.unpackb. The default encodeOpts value is {'use_bin_type': True}. The default
+    decodeOpts value is {'raw': False}
+    '''
+    self.__encodeOpts = encodeOpts if encodeOpts is not None else {
+      'use_bin_type': True
+    }
+    self.__decodeOpts = decodeOpts if decodeOpts is not None else {
+      'raw': False,
+      'use_list': False
+    }
+
+  def encode(self, value: Any) -> bytes:
+    return msgpack.packb(value, **self.__encodeOpts)
+
+  def decode(self, value: bytes) -> Any:
+    return msgpack.unpackb(value, **self.__decodeOpts)
+
+
+class FileAccessInterface(abc.ABC):
+  '''
+  The ABC FileAccessInterface loads and saves arbitrary values from and to a storage medium.
+  '''
+
+  @abc.abstractmethod
+  def load(self) -> Any:
+    ''' Loads a value from a storage medium '''
     raise NotImplementedError()
 
-  async def clear(self, key: str) -> 'Store':
-    return await self.clearAll([key])
-
-  async def clearAll(self, keys: Iterable[str]) -> 'Store':
+  @abc.abstractmethod
+  def save(self, value: Any) -> None:
+    ''' Saves the given value to a storage medium '''
     raise NotImplementedError()
 
 
-async def readFile(name: str) -> bytes:
-  def inExecutor():
-    with open(name, 'r+b') as fd:
-      return fd.read()
+class SimpleFileAccess(FileAccessInterface):
+  '''
+  The class SimpleFileAccess loads and stores arbitrary values from and to the filesystem. It
+  stores first a temporary file and renames this file.
+  '''
 
-  loop = asyncio.get_event_loop()
-  return await loop.run_in_executor(None, inExecutor)
+  def __init__(self, filename: str, format: FileFormatInterface = None):
+    '''
+    Initializes the SimpleFileAccess instance. The filename denotes a location in the filesystem
+    to store values and format specifies the file format for encoding an decoding. If format is
+    not given the MessagePackFileFormat is used.
+    '''
 
+    self.__filename = filename
+    self.__format = format if format is not None else MessagePackFileFormat()
 
-async def writeFile(name: str, data: bytes):
-  def inExecutor():
-    with open(name, 'w+b') as fd:
-      return fd.write(data)
+  def load(self) -> Any:
+    try:
+      with open(self.__filename + '.wrt', 'rb') as fp:
+        return self.__format.decode(fp.read())
+    except Exception:
+      with open(self.__filename, 'rb') as fp:
+        return self.__format.decode(fp.read())
 
-  loop = asyncio.get_event_loop()
-  return await loop.run_in_executor(None, inExecutor)
+  def save(self, value: Any) -> None:
+    with open(self.__filename + '.wrt', 'wb') as fp:
+      fp.write(self.__format.encode(value))
+      fp.flush()
+      os.fsync(fp.fileno())
 
+    os.rename(self.__filename + '.wrt', self.__filename)
+    fd = os.open(os.path.dirname(self.__filename), os.O_DIRECTORY)
+    os.fsync(fd)
+    os.close(fd)
 
-class FileStore(Store):
 
-  def __init__(self, name):
-    self.__name = name
+class EventualFileAccess(FileAccessInterface):
+  '''
+  Wraps an implementation of FileAccessInterface and stores the value eventually.
+  '''
 
-    self.__data = None
-    self.__lock = asyncio.Lock()
+  def __init__(self, access: FileAccessInterface, wait=15, loop: asyncio.AbstractEventLoop = None):
+    self.__access = access
+    self.__wait = wait
+    self.__loop = loop if loop is not None else asyncio.get_event_loop()
 
-  @property
-  def name(self):
-    return self.__name
+    self.__task = False
+    self.__todo = False
+    self.__value = None
 
-  async def __read(self):
-    await self.__lock.acquire()
+  def load(self) -> Any:
+    return self.__value if self.__todo else self.__access.load()
 
-    try:
-      data = await readFile(self.__name)
-      data = msgpack.unpackb(data, encoding='utf-8')
-      self.__data = data
-    except FileNotFoundError:
-      self.__data = {}
-    finally:
-      self.__lock.release()
+  def save(self, value: Any) -> None:
+    self.__value = value
+    self.__todo = True
+
+    self.__loop.call_soon(self.__start)
+
+  def __start(self):
+    if self.__task or not self.__todo:
+      return
+
+    self.__task = True
+    self.__loop.call_soon(self.__store)
 
-  async def __write(self):
-    await self.__lock.acquire()
+  def __store(self):
+    if not self.__todo:
+      return
 
     try:
-      data = msgpack.packb(self.__data, use_bin_type=True)
-      await writeFile(self.__name, data)
-    finally:
-      self.__lock.release()
+      self.__access.save(self.__value)
+    except Exception:
+      traceback.print_exc()
 
-  async def keys(self):
-    if self.__data is None:
-      await self.__read()
+    self.__value = None
+    self.__todo = False
 
-    return self.__data.keys()
+    self.__loop.call_later(self.__wait, self.__clear)
 
-  async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]:
-    if self.__data is None:
-      await self.__read()
+  def __clear(self):
+    if not self.__task:
+      return
 
-    vals = {}
-    for key in keys:
-      vals[key] = self.__data[key] if key in self.__data else None
-    return vals
+    self.__task = False
+    if self.__todo:
+      self.__loop.call_soon(self.__start)
 
-  async def saveAll(self, vals: Mapping[str, Any]) -> Store:
-    if self.__data is None:
-      await self.__read()
 
-    for key, val in vals.items():
-      self.__data[key] = val
+class ValueStoreInterface(abc.ABC):
+  """
+  The ABC ValueStoreInterface defines methods to store and update arbitrary values. A stored value
+  must never be mutated! Hint: Use immutable data types!
+  """
 
-    await self.__write()
-    return self
+  @abc.abstractproperty
+  def value(self) -> Any:
+    """
+    Returns the value stored by the ValueStore. The returned value must never be mutated! Hint: Use
+    immutable data types!
+    """
+    raise NotImplementedError()
+
+  @abc.abstractproperty
+  def update(self, updater: Callable[[Any], Any]) -> Any:
+    """
+    Updates the value stored by the ValueStore. The given updater callable gets called with the
+    current value in the ValueStore. The updater callable must construct and return a NEW updated
+    value. It MUST NOT mutate the given value! Hint: Use immutable data types! The ValueStore stores
+    and returns the new value.
+    """
+    raise NotImplementedError()
+
+
+class MemoryValueStore(ValueStoreInterface):
+  """
+  The MemoryValueStores stores the Value in the main memory.
+  """
+
+  def __init__(self, initial: Any = None):
+    self.__value = initial
+
+  @property
+  def value(self) -> Any:
+    return self.__value
+
+  def update(self, updater: Callable[[Any], Any]) -> Any:
+    self.__value = updater(self.__value)
+    return self.__value
+
+
+class FileValueStore(ValueStoreInterface):
+  """
+  The FileValueStore stores the value via the given FileAccessInterface
+  """
+
+  def __init__(self, access: FileAccessInterface, initial: Any = None):
+    if isinstance(access, str):
+      access = SimpleFileAccess(access)
+    self.__access = access
+
+    try:
+      self.__value = self.__access.load()
+    except Exception:
+      #traceback.print_exc()
+      self.__value = initial
+
+  @property
+  def value(self) -> Any:
+    return self.__value
 
-  async def clearAll(self, keys: Iterable[str]) -> Store:
-    if self.__data is None:
-      await self.__read()
+  def update(self, updater: Callable[[Any], Any]) -> Any:
+    value = updater(self.__value)
+    self.__access.save(value)
+    self.__value = value
+    return value
 
-    for key in keys:
-      if key in self.__data:
-        del self.__data[key]
 
-    await self.__write()
-    return self
\ No newline at end of file
+# A function that returns a ValueStore given some initial value
+ValueStoreFactory = Callable[[Any], ValueStoreInterface]
\ No newline at end of file
index 9de208567ef77549d93a7752cb28e18459022809..130fd978ef9277320a6ad99b8cf289fb283a3933 100644 (file)
 
 import os
 import asyncio
+import tempfile
+import unittest
 
-from supcon.store import FileStore
+import msgpack
 
-fs = FileStore(os.path.dirname(__file__) + '/store.msg')
+import supcon.store
 
-async def testFileStore(fs):
-  print(await fs.load('a'))
-  await fs.save('a', 'abc')
-  print(await fs.load('a'))
 
-loop = asyncio.get_event_loop()
-loop.run_until_complete(testFileStore(fs))
-loop.close()
+class TestMessagePackFileFormat(unittest.TestCase):
+
+  def test_encode(self):
+    value = 'lalala'
+
+    f = supcon.store.MessagePackFileFormat()
+    self.assertEqual(f.encode(value), msgpack.packb(value, use_bin_type=True))
+
+  def test_decode(self):
+    value = 'lalala'
+    value = msgpack.packb(value, use_bin_type=True)
+
+    f = supcon.store.MessagePackFileFormat()
+    self.assertEqual(f.decode(value), msgpack.unpackb(value, raw=False))
+
+
+class TestSimpleFileAccess(unittest.TestCase):
+
+  def test_saveload(self):
+    (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+    os.close(handle)
+
+    access = supcon.store.SimpleFileAccess(filename)
+
+    decoded = 'lalala'
+    encoded = msgpack.packb(decoded, use_bin_type=True)
+
+    access.save(decoded)
+    with open(filename, 'rb') as fp:
+      self.assertEqual(fp.read(), encoded)
+
+    self.assertEqual(access.load(), decoded)
+
+    os.remove(filename)
+
+
+class TestEventualFileAccess(unittest.TestCase):
+
+  def test_saveload(self):
+    (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+    os.close(handle)
+
+    simple = supcon.store.SimpleFileAccess(filename)
+    access = supcon.store.EventualFileAccess(simple, 1.5)
+
+    async def runIt():
+      access.save('random1')
+      self.assertEqual(access.load(), 'random1')
+
+      await asyncio.sleep(0.5) # first save
+
+      self.assertEqual(simple.load(), 'random1')
+
+      access.save('random2')
+      self.assertEqual(simple.load(), 'random1')
+      self.assertEqual(access.load(), 'random2')
+
+      await asyncio.sleep(0.5) # no save
+
+      self.assertEqual(simple.load(), 'random1')
+      self.assertEqual(access.load(), 'random2')
+
+      await asyncio.sleep(1) # second save
+
+      self.assertEqual(simple.load(), 'random2')
+      self.assertEqual(access.load(), 'random2')
+
+      await asyncio.sleep(1.5) # no save
+
+      self.assertEqual(simple.load(), 'random2')
+      self.assertEqual(access.load(), 'random2')
+
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(runIt())
+
+    os.remove(filename)
+
+
+class TestMemoryValueStore(unittest.TestCase):
+
+  def test_update(self):
+    store = supcon.store.MemoryValueStore(1)
+    self.assertEqual(1, store.value)
+
+    store.update(lambda v: v + 1)
+    self.assertEqual(2, store.value)
+
+    store.update(lambda v: v + 5)
+    self.assertEqual(7, store.value)
+
+    store.update(lambda v: -1)
+    self.assertEqual(-1, store.value)
+
+
+class TestFileValueStore(unittest.TestCase):
+
+  def test_update(self):
+    (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-')
+    os.close(handle)
+
+    access = supcon.store.SimpleFileAccess(filename)
+    store = supcon.store.FileValueStore(access, 1)
+    self.assertEqual(1, store.value)
+
+    store.update(lambda v: v + 1)
+    self.assertEqual(2, store.value)
+    self.assertEqual(2, access.load())
+
+    store.update(lambda v: v + 5)
+    self.assertEqual(7, store.value)
+    self.assertEqual(7, access.load())
+
+    store.update(lambda v: -1)
+    self.assertEqual(-1, store.value)
+    self.assertEqual(-1, access.load())
+
+    os.remove(filename)
+
+
+if __name__ == '__main__':
+  unittest.main()