import serial import logging from prometheus_async.aio import time from barcode_server.barcode import BarcodeEvent from barcode_server.notifier import BarcodeNotifier from barcode_server.stats import SERIAL_NOTIFIER_TIME LOGGER = logging.getLogger(__name__) class SerialNotifier(BarcodeNotifier): def __init__(self, pathA: str, pathB: str): super().__init__() self.pathA = pathA self.pathB = pathB self.inputDevicePathA = None self.inputDevicePathB = None self.usbA = serial.Serial(pathA, 9600, timeout=2) if pathB is not None: self.usbB = serial.Serial(pathB, 9600, timeout=2) @time(SERIAL_NOTIFIER_TIME) async def _send_event(self, event: BarcodeEvent): if self.inputDevicePathA is None: self.inputDevicePathA = event.input_device.path elif self.inputDevicePathB is None: self.inputDevicePathB = event.input_device.path if event.input_device.path == self.inputDevicePathA: self.usbA.write(event.barcode.encode()) LOGGER.debug(f"Notified {self.pathA}: {self.inputDevicePathA}: {event.barcode}") elif event.input_device.path == self.inputDevicePathB: self.usbB.write(event.barcode.encode()) LOGGER.debug(f"Notified {self.pathB}: {self.inputDevicePathB}: {event.barcode}")