import os
import picamera # type: ignore
import PIL.Image # type: ignore
+import time
from controlpi import BasePlugin, Message, MessageTemplate
CONF_SCHEMA = {'properties':
{'pause': {'type': 'number'},
'keep': {'type': 'integer'},
- 'path': {'type': 'string'},
- 'iso': {'type': 'integer'},
- 'full': {'type': 'array', 'items': {'type': 'integer'},
- 'minItems': 2, 'maxItems': 2},
- 'web': {'type': 'array', 'items': {'type': 'integer'},
- 'minItems': 2, 'maxItems': 2}},
+ 'path': {'type': 'string'}},
'required': ['pause', 'keep', 'path']}
async def _receive(self, message: Message) -> None:
"""Register plugin as bus client."""
self._images: Deque[Tuple[str, str]] = collections.deque()
self._capture = False
- self._iso = 200
- if 'iso' in self.conf:
- self._iso = self.conf['iso']
- self._full = (1024, 768)
- if 'full' in self.conf:
- self._full = self.conf['full']
- self._web = (640, 480)
- if 'web' in self.conf:
- self._web = self.conf['web']
sends = [MessageTemplate({'event': {'const': 'new image'},
'image': {'type': 'string'},
'date': {'type': 'string'}}),
"""Run camera."""
loop = asyncio.get_running_loop()
executor = concurrent.futures.ThreadPoolExecutor()
- camera = picamera.PiCamera(sensor_mode=2)
- camera.iso = self._iso
- camera.framerate = 15
+ camera = picamera.PiCamera(sensor_mode=7,
+ resolution=(640, 480),
+ framerate=90)
try:
while True:
if self._capture:
web_name = 'web-'+full_name
full_path = os.path.join(self.conf['path'], full_name)
web_path = os.path.join(self.conf['path'], web_name)
+ start_direct = time.perf_counter()
+ await loop.run_in_executor(
+ executor,
+ functools.partial(
+ camera.capture,
+ full_path,
+ 'jpeg',
+ use_video_port=True))
+ end_direct = time.perf_counter()
+ print(f"Direct: {end_direct - start_direct}")
+ start_pil = time.perf_counter()
stream = io.BytesIO()
await loop.run_in_executor(
executor,
camera.capture,
stream,
'jpeg',
- resize=self._full))
+ use_video_port=True))
stream.seek(0)
full_image = await loop.run_in_executor(
executor,
await loop.run_in_executor(
executor,
functools.partial(
- full_image.save, full_path))
- web_image = await loop.run_in_executor(
- executor,
- functools.partial(
- full_image.resize,
- self._web,
- PIL.Image.ANTIALIAS))
- await loop.run_in_executor(
- executor,
- functools.partial(
- web_image.save, web_path))
+ full_image.save, web_path))
stream.close()
+ end_pil = time.perf_counter()
+ print(f"PIL: {end_pil - start_pil}")
self._images.append((full_name, iso_time))
await self.bus.send(Message(self.name,
{'event': 'new image',