From 40a89f8186bea2302d0ae78003af4fa73580ff27 Mon Sep 17 00:00:00 2001 From: Sebastian Brix Date: Mon, 16 Jul 2018 14:36:46 +0200 Subject: [PATCH] rewrote the store library --- supcon/cache.py | 126 +++++++--------------- supcon/store.py | 275 +++++++++++++++++++++++++++++++++++------------- tests/store.py | 134 +++++++++++++++++++++-- 3 files changed, 362 insertions(+), 173 deletions(-) diff --git a/supcon/cache.py b/supcon/cache.py index 6778395..d97a757 100644 --- a/supcon/cache.py +++ b/supcon/cache.py @@ -2,15 +2,12 @@ An EventCache provides a cache for locally sourced events on a supcon bus ''' -import os import asyncio - -from typing import Sequence, Callable - -import msgpack +import functools import supcon.intf import supcon.util +import supcon.store EventCacheIntf = supcon.intf.DInterface.load({ 'name': 'supcon.EventCache', @@ -43,15 +40,14 @@ class LocalEventCache(supcon.intf.Implementation): events on the bus. ''' - def __init__(self, local: supcon.intf.Node, loadEvents: Callable, saveEvents: Callable): + def __init__(self, local: supcon.intf.Node, store: supcon.store.ValueStoreFactory, maxEvents = 1000): supcon.intf.Implementation.__init__(self, EventCacheIntf) self.__local = local - self.__loadEvents = loadEvents - self.__saveEvents = saveEvents + self.__store = store([]) + self.__maxEvents = maxEvents self.__watches = {} - self.__events = self.__loadEvents() self.setCallCb('events', self.__onEventsCall) @@ -74,24 +70,26 @@ class LocalEventCache(supcon.intf.Implementation): def __onEvent(self, args, event): number = 0 - if self.__events: - number = self.__events[-1][0] + 1 + if self.__store.value: + number = self.__store.value[-1][0] + 1 _, path, intf, name = event - self.__events.append((number, path, intf, name, args)) - self.__events = self.__saveEvents(self.__events) + def _updater(events): + events = events + ((number, path, intf, name, args),) + return events[-self.__maxEvents:] + self.__store.update(_updater) self.fire('event', {'number': number, 'path': path, 'intf': intf, 'name': name, 'args': args}) - def __onEventsCall(self, next=None): + def __onEventsCall(self, next=0): last = None first = None - if self.__events: - last = self.__events[-1][0] - first = self.__events[0][0] + if self.__store.value: + last = self.__store.value[-1][0] + first = self.__store.value[0][0] index = next - first if next >= first else 0 - return {'last': last, 'first': first, 'events': self.__events[index:]} + return {'last': last, 'first': first, 'events': self.__store.value[index:]} def MemoryLocalEventCache(local: supcon.intf.Node, maxEvents=1000) -> LocalEventCache: @@ -99,15 +97,7 @@ def MemoryLocalEventCache(local: supcon.intf.Node, maxEvents=1000) -> LocalEvent Creates a LocalEventCache that stores the cached events in memory. ''' - def _loadEvents() -> Sequence: - return [] - - def _saveEvents(events: Sequence) -> Sequence: - if len(events) > maxEvents: - events = events[-maxEvents:] - return events - - return LocalEventCache(local, _loadEvents, _saveEvents) + return LocalEventCache(local, supcon.store.MemoryValueStore, maxEvents) def MsgpackLocalEventCache(local: supcon.intf.Node, filename: str, maxEvents=1000) \ @@ -115,27 +105,10 @@ def MsgpackLocalEventCache(local: supcon.intf.Node, filename: str, maxEvents=100 ''' Creates a LocalEventCache that stores the cached events in the filesystem. ''' - def _loadEvents() -> Sequence: - try: - with open(filename, 'rb') as fp: - return msgpack.unpackb(fp.read(), raw=False) - except FileNotFoundError: - return [] - - def _saveEvents(events: Sequence) -> Sequence: - if len(events) > maxEvents: - events = events[-maxEvents:] - with open(filename + '.wrt', 'wb') as fp: - fp.write(msgpack.packb(events, use_bin_type=True)) - try: - os.remove(filename) - except FileNotFoundError: - pass - os.rename(filename + '.wrt', filename) - return events - - return LocalEventCache(local, _loadEvents, _saveEvents) + access = supcon.store.SimpleFileAccess(filename) + store = functools.partial(supcon.store.FileValueStore, access) + return LocalEventCache(local, store, maxEvents) class RemoteEventCache(supcon.util.EventEmitterMixin): @@ -145,20 +118,18 @@ class RemoteEventCache(supcon.util.EventEmitterMixin): ''' def __init__(self, local: supcon.intf.Node, node: str, path: str, \ - loadNext: Callable, saveNext: Callable, loop: asyncio.AbstractEventLoop = None): + store: supcon.store.ValueStoreFactory, loop: asyncio.AbstractEventLoop = None): self.__local = local self.__node = node self.__path = path - self.__loadNext = loadNext - self.__saveNext = saveNext self.__loop = loop if loop else asyncio.get_event_loop() self.__intfH = None self.__eventH = None - self.__next = self.__loadNext() + self.__store = store(0) def setup(self): ''' Starts watching for events on the bus ''' @@ -186,33 +157,32 @@ class RemoteEventCache(supcon.util.EventEmitterMixin): self.__loop.create_task(self.__fetchEvents()) def __onEvent(self, args, _): - if args['number'] == self.__next: - self.__next += 1 - self.__saveNext(self.__next) + if args['number'] == self.__store.value: + self.__store.update((1).__add__) self._emit((args['path'], args['intf'], args['name']), **args['args']) else: self.__loop.create_task(self.__fetchEvents()) async def __fetchEvents(self): method = (self.__node, self.__path, EventCacheIntf.name, 'events') - inArgs = {'next': self.__next} + inArgs = {'next': self.__store.value} outArgs = await self.__local.call(*method, inArgs) - if outArgs['first'] > self.__next: - self._emit('missing', amount=outArgs['first'] - self.__next) - self.__next = outArgs['first'] + if outArgs['first'] > self.__store.value: + amount = outArgs['first'] - self.__store.value + self.__store.update((outArgs['first']).__add__(0)) + self._emit('missing', amount=amount) for number, path, intf, event, args in outArgs['events']: - if number < self.__next: + if number < self.__store.value: continue - if number > self.__next: + if number > self.__store.value: break - self.__next += 1 - self.__saveNext(self.__next) + self.__store.update((1).__add__) self._emit((path, intf, event), **args) - if self.__next - 1 < outArgs['last']: + if self.__store.value - 1 < outArgs['last']: self.__loop.create_task(self.__fetchEvents()) @@ -222,13 +192,7 @@ def MemoryRemoteEventCache(local: supcon.intf.Node, node: str, path: str, Creates a RemoteEventCache that stores the sequence number of the last observed event in memory. ''' - def _loadNext() -> int: - return 0 - - def _saveNext(value: int): - pass - - return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop) + return RemoteEventCache(local, node, path, supcon.store.MemoryValueStore, loop) def MsgpackRemoteEventCache(local: supcon.intf.Node, node: str, path: str, filename: str, @@ -238,21 +202,7 @@ def MsgpackRemoteEventCache(local: supcon.intf.Node, node: str, path: str, filen filesystem. ''' - def _loadNext() -> int: - try: - with open(filename, 'rb') as fp: - return msgpack.unpackb(fp.read(), raw=False) - except FileNotFoundError: - return 0 - - def _saveNext(value: int): - with open(filename + '.wrt', 'wb') as fp: - fp.write(msgpack.packb(value, use_bin_type=True)) - try: - os.remove(filename) - except FileNotFoundError: - pass - os.rename(filename + '.wrt', filename) - return True - - return RemoteEventCache(local, node, path, _loadNext, _saveNext, loop) + access = supcon.store.SimpleFileAccess(filename) + store = functools.partial(supcon.store.FileValueStore, access) + + return RemoteEventCache(local, node, path, store, loop) diff --git a/supcon/store.py b/supcon/store.py index cbde0ae..1179f86 100644 --- a/supcon/store.py +++ b/supcon/store.py @@ -1,117 +1,240 @@ +""" +The Module provides ABCs and classes to reliably store and access values on a storage medium. +""" -from typing import Iterable, Mapping, Any +__all__ = ( + 'FileFormatInterface', 'MessagePackFileFormat', 'FileAccessInterface', 'SimpleFileAccess', + 'EventualFileAccess', 'ValueStoreInterface', 'MemoryValueStore', 'FileValueStore', + 'ValueStoreFactory' +) +from typing import Callable, Mapping, Any + +import os import abc import asyncio +import traceback + import msgpack -class Store(abc.ABC): - async def keys(self) -> Iterable[str]: - raise NotImplementedError() +class FileFormatInterface(abc.ABC): + ''' + The ABC FileFormatInterface encodes and decodes arbitrary values into and from bytes objects. + Implementations of this ABC are used by the class SimpleFileAccess. + ''' - async def load(self, key: str) -> Any: - return (await self.loadAll([key]))[key] + @abc.abstractmethod + def encode(self, value: Any) -> bytes: + '''Encodes the given value into a bytes object.''' + raise NotImplementedError() - async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]: + @abc.abstractmethod + def decode(self, value: bytes) -> Any: + '''Decodes the given bytes object into a value.''' raise NotImplementedError() - async def save(self, key: str, val: Any) -> 'Store': - return await self.saveAll({key: val}) - async def saveAll(self, vals: Mapping[str, Any]) -> 'Store': +class MessagePackFileFormat(FileFormatInterface): + ''' + The MessagePackFileFormat encodes and decodes arbitrary values into and from bytes objects + with the msgpack library + ''' + + def __init__(self, encodeOpts: Mapping = None, decodeOpts: Mapping = None): + ''' + The given encodeOpts and decodeOpts get passed as keyword arguments to msgpack.packb and + msgpack.unpackb. The default encodeOpts value is {'use_bin_type': True}. The default + decodeOpts value is {'raw': False} + ''' + self.__encodeOpts = encodeOpts if encodeOpts is not None else { + 'use_bin_type': True + } + self.__decodeOpts = decodeOpts if decodeOpts is not None else { + 'raw': False, + 'use_list': False + } + + def encode(self, value: Any) -> bytes: + return msgpack.packb(value, **self.__encodeOpts) + + def decode(self, value: bytes) -> Any: + return msgpack.unpackb(value, **self.__decodeOpts) + + +class FileAccessInterface(abc.ABC): + ''' + The ABC FileAccessInterface loads and saves arbitrary values from and to a storage medium. + ''' + + @abc.abstractmethod + def load(self) -> Any: + ''' Loads a value from a storage medium ''' raise NotImplementedError() - async def clear(self, key: str) -> 'Store': - return await self.clearAll([key]) - - async def clearAll(self, keys: Iterable[str]) -> 'Store': + @abc.abstractmethod + def save(self, value: Any) -> None: + ''' Saves the given value to a storage medium ''' raise NotImplementedError() -async def readFile(name: str) -> bytes: - def inExecutor(): - with open(name, 'r+b') as fd: - return fd.read() +class SimpleFileAccess(FileAccessInterface): + ''' + The class SimpleFileAccess loads and stores arbitrary values from and to the filesystem. It + stores first a temporary file and renames this file. + ''' - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, inExecutor) + def __init__(self, filename: str, format: FileFormatInterface = None): + ''' + Initializes the SimpleFileAccess instance. The filename denotes a location in the filesystem + to store values and format specifies the file format for encoding an decoding. If format is + not given the MessagePackFileFormat is used. + ''' + self.__filename = filename + self.__format = format if format is not None else MessagePackFileFormat() -async def writeFile(name: str, data: bytes): - def inExecutor(): - with open(name, 'w+b') as fd: - return fd.write(data) + def load(self) -> Any: + try: + with open(self.__filename + '.wrt', 'rb') as fp: + return self.__format.decode(fp.read()) + except Exception: + with open(self.__filename, 'rb') as fp: + return self.__format.decode(fp.read()) - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, inExecutor) + def save(self, value: Any) -> None: + with open(self.__filename + '.wrt', 'wb') as fp: + fp.write(self.__format.encode(value)) + fp.flush() + os.fsync(fp.fileno()) + os.rename(self.__filename + '.wrt', self.__filename) + fd = os.open(os.path.dirname(self.__filename), os.O_DIRECTORY) + os.fsync(fd) + os.close(fd) -class FileStore(Store): - def __init__(self, name): - self.__name = name +class EventualFileAccess(FileAccessInterface): + ''' + Wraps an implementation of FileAccessInterface and stores the value eventually. + ''' - self.__data = None - self.__lock = asyncio.Lock() + def __init__(self, access: FileAccessInterface, wait=15, loop: asyncio.AbstractEventLoop = None): + self.__access = access + self.__wait = wait + self.__loop = loop if loop is not None else asyncio.get_event_loop() - @property - def name(self): - return self.__name + self.__task = False + self.__todo = False + self.__value = None - async def __read(self): - await self.__lock.acquire() + def load(self) -> Any: + return self.__value if self.__todo else self.__access.load() - try: - data = await readFile(self.__name) - data = msgpack.unpackb(data, encoding='utf-8') - self.__data = data - except FileNotFoundError: - self.__data = {} - finally: - self.__lock.release() + def save(self, value: Any) -> None: + self.__value = value + self.__todo = True + + self.__loop.call_soon(self.__start) + + def __start(self): + if self.__task or not self.__todo: + return + + self.__task = True + self.__loop.call_soon(self.__store) - async def __write(self): - await self.__lock.acquire() + def __store(self): + if not self.__todo: + return try: - data = msgpack.packb(self.__data, use_bin_type=True) - await writeFile(self.__name, data) - finally: - self.__lock.release() + self.__access.save(self.__value) + except Exception: + traceback.print_exc() - async def keys(self): - if self.__data is None: - await self.__read() + self.__value = None + self.__todo = False - return self.__data.keys() + self.__loop.call_later(self.__wait, self.__clear) - async def loadAll(self, keys: Iterable[str]) -> Mapping[str, Any]: - if self.__data is None: - await self.__read() + def __clear(self): + if not self.__task: + return - vals = {} - for key in keys: - vals[key] = self.__data[key] if key in self.__data else None - return vals + self.__task = False + if self.__todo: + self.__loop.call_soon(self.__start) - async def saveAll(self, vals: Mapping[str, Any]) -> Store: - if self.__data is None: - await self.__read() - for key, val in vals.items(): - self.__data[key] = val +class ValueStoreInterface(abc.ABC): + """ + The ABC ValueStoreInterface defines methods to store and update arbitrary values. A stored value + must never be mutated! Hint: Use immutable data types! + """ - await self.__write() - return self + @abc.abstractproperty + def value(self) -> Any: + """ + Returns the value stored by the ValueStore. The returned value must never be mutated! Hint: Use + immutable data types! + """ + raise NotImplementedError() + + @abc.abstractproperty + def update(self, updater: Callable[[Any], Any]) -> Any: + """ + Updates the value stored by the ValueStore. The given updater callable gets called with the + current value in the ValueStore. The updater callable must construct and return a NEW updated + value. It MUST NOT mutate the given value! Hint: Use immutable data types! The ValueStore stores + and returns the new value. + """ + raise NotImplementedError() + + +class MemoryValueStore(ValueStoreInterface): + """ + The MemoryValueStores stores the Value in the main memory. + """ + + def __init__(self, initial: Any = None): + self.__value = initial + + @property + def value(self) -> Any: + return self.__value + + def update(self, updater: Callable[[Any], Any]) -> Any: + self.__value = updater(self.__value) + return self.__value + + +class FileValueStore(ValueStoreInterface): + """ + The FileValueStore stores the value via the given FileAccessInterface + """ + + def __init__(self, access: FileAccessInterface, initial: Any = None): + if isinstance(access, str): + access = SimpleFileAccess(access) + self.__access = access + + try: + self.__value = self.__access.load() + except Exception: + #traceback.print_exc() + self.__value = initial + + @property + def value(self) -> Any: + return self.__value - async def clearAll(self, keys: Iterable[str]) -> Store: - if self.__data is None: - await self.__read() + def update(self, updater: Callable[[Any], Any]) -> Any: + value = updater(self.__value) + self.__access.save(value) + self.__value = value + return value - for key in keys: - if key in self.__data: - del self.__data[key] - await self.__write() - return self \ No newline at end of file +# A function that returns a ValueStore given some initial value +ValueStoreFactory = Callable[[Any], ValueStoreInterface] \ No newline at end of file diff --git a/tests/store.py b/tests/store.py index 9de2085..130fd97 100644 --- a/tests/store.py +++ b/tests/store.py @@ -2,16 +2,132 @@ import os import asyncio +import tempfile +import unittest -from supcon.store import FileStore +import msgpack -fs = FileStore(os.path.dirname(__file__) + '/store.msg') +import supcon.store -async def testFileStore(fs): - print(await fs.load('a')) - await fs.save('a', 'abc') - print(await fs.load('a')) -loop = asyncio.get_event_loop() -loop.run_until_complete(testFileStore(fs)) -loop.close() +class TestMessagePackFileFormat(unittest.TestCase): + + def test_encode(self): + value = 'lalala' + + f = supcon.store.MessagePackFileFormat() + self.assertEqual(f.encode(value), msgpack.packb(value, use_bin_type=True)) + + def test_decode(self): + value = 'lalala' + value = msgpack.packb(value, use_bin_type=True) + + f = supcon.store.MessagePackFileFormat() + self.assertEqual(f.decode(value), msgpack.unpackb(value, raw=False)) + + +class TestSimpleFileAccess(unittest.TestCase): + + def test_saveload(self): + (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-') + os.close(handle) + + access = supcon.store.SimpleFileAccess(filename) + + decoded = 'lalala' + encoded = msgpack.packb(decoded, use_bin_type=True) + + access.save(decoded) + with open(filename, 'rb') as fp: + self.assertEqual(fp.read(), encoded) + + self.assertEqual(access.load(), decoded) + + os.remove(filename) + + +class TestEventualFileAccess(unittest.TestCase): + + def test_saveload(self): + (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-') + os.close(handle) + + simple = supcon.store.SimpleFileAccess(filename) + access = supcon.store.EventualFileAccess(simple, 1.5) + + async def runIt(): + access.save('random1') + self.assertEqual(access.load(), 'random1') + + await asyncio.sleep(0.5) # first save + + self.assertEqual(simple.load(), 'random1') + + access.save('random2') + self.assertEqual(simple.load(), 'random1') + self.assertEqual(access.load(), 'random2') + + await asyncio.sleep(0.5) # no save + + self.assertEqual(simple.load(), 'random1') + self.assertEqual(access.load(), 'random2') + + await asyncio.sleep(1) # second save + + self.assertEqual(simple.load(), 'random2') + self.assertEqual(access.load(), 'random2') + + await asyncio.sleep(1.5) # no save + + self.assertEqual(simple.load(), 'random2') + self.assertEqual(access.load(), 'random2') + + loop = asyncio.get_event_loop() + loop.run_until_complete(runIt()) + + os.remove(filename) + + +class TestMemoryValueStore(unittest.TestCase): + + def test_update(self): + store = supcon.store.MemoryValueStore(1) + self.assertEqual(1, store.value) + + store.update(lambda v: v + 1) + self.assertEqual(2, store.value) + + store.update(lambda v: v + 5) + self.assertEqual(7, store.value) + + store.update(lambda v: -1) + self.assertEqual(-1, store.value) + + +class TestFileValueStore(unittest.TestCase): + + def test_update(self): + (handle, filename) = tempfile.mkstemp('.msg', 'TestSimpleFileAccess-') + os.close(handle) + + access = supcon.store.SimpleFileAccess(filename) + store = supcon.store.FileValueStore(access, 1) + self.assertEqual(1, store.value) + + store.update(lambda v: v + 1) + self.assertEqual(2, store.value) + self.assertEqual(2, access.load()) + + store.update(lambda v: v + 5) + self.assertEqual(7, store.value) + self.assertEqual(7, access.load()) + + store.update(lambda v: -1) + self.assertEqual(-1, store.value) + self.assertEqual(-1, access.load()) + + os.remove(filename) + + +if __name__ == '__main__': + unittest.main() -- 2.34.1