48 lines
1.7 KiB
Python

import asyncio
import logging
from typing import Dict
from barcode_server.barcode import BarcodeReader, BarcodeEvent
from barcode_server.config import AppConfig
from barcode_server.const import *
from barcode_server.notifier import BarcodeNotifier
from barcode_server.notifier.serial import SerialNotifier
from barcode_server.notifier.file import FileNotifier
from barcode_server.stats import REST_TIME_DEVICES, WEBSOCKET_CLIENT_COUNT
from barcode_server.util import input_device_to_dict
LOGGER = logging.getLogger(__name__)
class Server:
def __init__(self, config: AppConfig, barcode_reader: BarcodeReader):
self.config = config
self.barcode_reader = barcode_reader
self.barcode_reader.add_listener(self.on_barcode)
self.notifiers: Dict[str, BarcodeNotifier] = {}
if config.SERIAL_PATH_A.value is not None:
serial_notifier = SerialNotifier(config.SERIAL_PATH_A.value, config.SERIAL_PATH_B.value)
self.notifiers["serial"] = serial_notifier
if config.FILE_PATH.value is not None:
file_notifier = FileNotifier(config.FILE_PATH.value)
self.notifiers["file"] = file_notifier
async def start(self):
# start detecting and reading barcode scanners
await self.barcode_reader.start()
# start notifier queue processors
for key, notifier in self.notifiers.items():
LOGGER.debug(f"Starting notifier: {key}")
await notifier.start()
# wait forever
return await asyncio.Event().wait()
async def on_barcode(self, event: BarcodeEvent):
for key, notifier in self.notifiers.items():
await notifier.add_event(event)