""" serial_port.py — pyserial-backed 8N1 serial port for the Windows edition. Exposes the same SerialPort interface as the TrueNAS/Linux edition so all other modules work without modification. """ import time from typing import Optional import serial class SerialPort: def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): self.port = port self.baudrate = baudrate self.timeout = timeout self._ser: Optional[serial.Serial] = None # ── Open / close ────────────────────────────────────────────────────────── def open(self): try: self._ser = serial.Serial( port = self.port, baudrate = self.baudrate, bytesize = serial.EIGHTBITS, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, timeout = 0, # non-blocking reads; we poll manually xonxoff = False, rtscts = False, dsrdtr = False, ) except serial.SerialException as e: raise OSError(f"Cannot open {self.port}: {e}") from e def close(self): if self._ser is not None: try: self._ser.close() except Exception: pass finally: self._ser = None @property def is_open(self) -> bool: return self._ser is not None and self._ser.is_open # ── Read / write ────────────────────────────────────────────────────────── def write(self, data: bytes): if self._ser is None: raise OSError("Port is not open") self._ser.write(data) def read_chunk(self, size: int = 4096) -> bytes: if self._ser is None: raise OSError("Port is not open") waiting = self._ser.in_waiting if waiting: return self._ser.read(min(size, waiting)) return b"" def read_until_quiet(self, quiet_period: float = 0.5, timeout: Optional[float] = None) -> str: """ Read until no new bytes arrive for `quiet_period` seconds, or until `timeout` (default: self.timeout) seconds have elapsed. Pass a longer timeout for operations like curl that take more time. """ output = b"" deadline = time.monotonic() + (timeout if timeout is not None else self.timeout) last_rx = time.monotonic() while True: now = time.monotonic() if now >= deadline: break if output and (now - last_rx) >= quiet_period: break chunk = self.read_chunk() if chunk: output += chunk last_rx = time.monotonic() else: time.sleep(0.05) return output.decode("utf-8", errors="replace") def send_line(self, cmd: str = "", delay: float = 0.3): self.write((cmd + "\r\n").encode("utf-8")) time.sleep(delay) # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self): self.open() return self def __exit__(self, *_): self.close()