CONF_SCHEMA = {'properties':
{'pause': {'type': 'number'},
'keep': {'type': 'integer'},
- 'path': {'type': 'string'}},
+ 'path': {'type': 'string'},
+ 'iso': {'enum': [0, 100, 200, 320, 400, 500, 640, 800]},
+ 'shutter speed': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 11111},
+ 'exposure mode': {'enum': ['off', 'auto', 'night',
+ 'nightpreview', 'backlight',
+ 'spotlight', 'sports', 'snow',
+ 'beach', 'verylong',
+ 'fixedfps', 'antishake',
+ 'fireworks']},
+ 'exposure compensation': {'type': 'integer',
+ 'minimum': -25,
+ 'maximum': 25},
+ 'brightness': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 100},
+ 'sharpness': {'type': 'integer',
+ 'minimum': -100,
+ 'maximum': 100},
+ 'contrast': {'type': 'integer',
+ 'minimum': -100,
+ 'maximum': 100},
+ 'saturation': {'type': 'integer',
+ 'minimum': -100,
+ 'maximum': 100},
+ 'awb mode': {'enum': ['off', 'auto', 'sunlight', 'cloudy',
+ 'shade', 'tungsten', 'fluorescent',
+ 'incandescent', 'flash',
+ 'horizon']}},
'required': ['pause', 'keep', 'path']}
async def _receive(self, message: Message) -> None:
"""Register plugin as bus client."""
self._images: Deque[Tuple[str, str]] = collections.deque()
self._capture = False
+ self._camera = picamera.PiCamera(sensor_mode=7,
+ resolution=(640, 480),
+ framerate=90)
+ if 'iso' not in self.conf:
+ self.conf['iso'] = 0
+ if 'shutter speed' not in self.conf:
+ self.conf['shutter speed'] = 0
+ if 'exposure mode' not in self.conf:
+ self.conf['exposure mode'] = 'auto'
+ if 'exposure compensation' not in self.conf:
+ self.conf['exposure compensation'] = 0
+ if 'brightness' not in self.conf:
+ self.conf['brightness'] = 50
+ if 'sharpness' not in self.conf:
+ self.conf['sharpness'] = 0
+ if 'contrast' not in self.conf:
+ self.conf['contrast'] = 0
+ if 'saturation' not in self.conf:
+ self.conf['saturation'] = 0
+ if 'awb mode' not in self.conf:
+ self.conf['awb mode'] = 'auto'
sends = [MessageTemplate({'event': {'const': 'new image'},
'image': {'type': 'string'},
'date': {'type': 'string'}}),
MessageTemplate({'image': {'type': 'string'},
- 'date': {'type': 'string'}})]
+ 'date': {'type': 'string'}}),
+ MessageTemplate({'event': {'const': 'new iso'},
+ 'iso': {'type': 'integer'}}),
+ MessageTemplate({'iso': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new shutter speed'},
+ 'shutter speed': {'type': 'integer'}}),
+ MessageTemplate({'shutter speed': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new exposure mode'},
+ 'exposure mode': {'type': 'string'}}),
+ MessageTemplate({'exposure mode': {'type': 'string'}}),
+ MessageTemplate({'event': {'const':
+ 'new exposure compensation'},
+ 'exposure compensation':
+ {'type': 'integer'}}),
+ MessageTemplate({'exposure compensation':
+ {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new brightness'},
+ 'brightness': {'type': 'integer'}}),
+ MessageTemplate({'brightness': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new sharpness'},
+ 'sharpness': {'type': 'integer'}}),
+ MessageTemplate({'sharpness': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new contrast'},
+ 'contrast': {'type': 'integer'}}),
+ MessageTemplate({'contrast': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new saturation'},
+ 'saturation': {'type': 'integer'}}),
+ MessageTemplate({'saturation': {'type': 'integer'}}),
+ MessageTemplate({'event': {'const': 'new awb mode'},
+ 'awb mode': {'type': 'string'}}),
+ MessageTemplate({'awb mode': {'type': 'string'}})]
receives = [MessageTemplate({'target': {'const': self.name},
'command': {'const': 'get image'}}),
MessageTemplate({'target': {'const': self.name},
"""Run camera."""
loop = asyncio.get_running_loop()
executor = concurrent.futures.ThreadPoolExecutor()
- camera = picamera.PiCamera(sensor_mode=7,
- resolution=(640, 480),
- framerate=90)
try:
while True:
if self._capture:
- camera.exposure_mode = 'sports'
- camera.awb_mode = 'auto'
- camera.start_preview()
+ self._camera.start_preview()
await asyncio.sleep(2)
while self._capture:
while len(self._images) >= self.conf['keep']:
await loop.run_in_executor(
executor,
functools.partial(
- camera.capture,
+ self._camera.capture,
stream,
'jpeg',
use_video_port=True))
'image': file_name,
'date': iso_time}))
await asyncio.sleep(self.conf['pause'])
- camera.stop_preview()
+ self._camera.stop_preview()
await asyncio.sleep(2)
except asyncio.CancelledError:
- camera.stop_preview()
- camera.close()
+ self._camera.stop_preview()
+ self._camera.close()
while len(self._images) > 0:
file_name = self._images.popleft()[0]
file_path = os.path.join(self.conf['path'], file_name)