""" serial_port.py — Minimal 8N1 serial port using only the Python standard library. Replaces pyserial for TrueNAS environments where pip is unavailable. """ import fcntl import os import select import termios import time from typing import Optional class SerialPort: BAUD_MAP = { 9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400, 57600: termios.B57600, 115200: termios.B115200, } def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): self.port = port self.baudrate = baudrate self.timeout = timeout self._fd: Optional[int] = None self._saved_attrs = None # ── Open / close ────────────────────────────────────────────────────────── def open(self): try: self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as e: raise OSError(f"Cannot open {self.port}: {e}") from e # Switch back to blocking mode now that O_NOCTTY is set flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) self._saved_attrs = termios.tcgetattr(self._fd) # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc attrs = list(termios.tcgetattr(self._fd)) baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) attrs[0] = termios.IGNBRK # iflag attrs[1] = 0 # oflag attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag attrs[3] = 0 # lflag (raw) attrs[4] = baud # ispeed attrs[5] = baud # ospeed attrs[6][termios.VMIN] = 0 attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) termios.tcsetattr(self._fd, termios.TCSANOW, attrs) termios.tcflush(self._fd, termios.TCIOFLUSH) def close(self): if self._fd is not None: try: if self._saved_attrs: termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) os.close(self._fd) except OSError: pass finally: self._fd = None @property def is_open(self) -> bool: return self._fd is not None # ── Read / write ────────────────────────────────────────────────────────── def write(self, data: bytes): if self._fd is None: raise OSError("Port is not open") os.write(self._fd, data) def read_chunk(self, size: int = 4096) -> bytes: if self._fd is None: raise OSError("Port is not open") try: return os.read(self._fd, size) except OSError: return b"" def read_until_quiet(self, quiet_period: float = 0.5, timeout: Optional[float] = None) -> str: """ Read until no new bytes arrive for `quiet_period` seconds, or until `timeout` (default: self.timeout) seconds have elapsed. Pass a longer timeout for operations like curl that take more time. """ output = b"" deadline = time.monotonic() + (timeout if timeout is not None else self.timeout) last_rx = time.monotonic() while True: now = time.monotonic() if now >= deadline: break if output and (now - last_rx) >= quiet_period: break wait = min(deadline - now, quiet_period) ready, _, _ = select.select([self._fd], [], [], wait) if ready: chunk = self.read_chunk() if chunk: output += chunk last_rx = time.monotonic() return output.decode("utf-8", errors="replace") def send_line(self, cmd: str = "", delay: float = 0.3): self.write((cmd + "\r\n").encode("utf-8")) time.sleep(delay) # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self): self.open() return self def __exit__(self, *_): self.close()