Files
es24n-conf-windows/modules/serial_port.py
scott a6a0f1b246 Initial commit — bootstrapped from es24n-conf (TrueNAS/Linux edition)
Starting point for Windows packaging. Serial backend will be replaced
with pyserial; PyInstaller used to produce a standalone .exe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 15:44:28 -04:00

127 lines
4.6 KiB
Python

"""
serial_port.py — Minimal 8N1 serial port using only the Python standard library.
Replaces pyserial for TrueNAS environments where pip is unavailable.
"""
import fcntl
import os
import select
import termios
import time
from typing import Optional
class SerialPort:
BAUD_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._fd: Optional[int] = None
self._saved_attrs = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
# Switch back to blocking mode now that O_NOCTTY is set
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
self._saved_attrs = termios.tcgetattr(self._fd)
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
attrs = list(termios.tcgetattr(self._fd))
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
attrs[0] = termios.IGNBRK # iflag
attrs[1] = 0 # oflag
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
attrs[3] = 0 # lflag (raw)
attrs[4] = baud # ispeed
attrs[5] = baud # ospeed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
termios.tcflush(self._fd, termios.TCIOFLUSH)
def close(self):
if self._fd is not None:
try:
if self._saved_attrs:
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
os.close(self._fd)
except OSError:
pass
finally:
self._fd = None
@property
def is_open(self) -> bool:
return self._fd is not None
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._fd is None:
raise OSError("Port is not open")
os.write(self._fd, data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._fd is None:
raise OSError("Port is not open")
try:
return os.read(self._fd, size)
except OSError:
return b""
def read_until_quiet(self, quiet_period: float = 0.5,
timeout: Optional[float] = None) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `timeout` (default: self.timeout) seconds have elapsed.
Pass a longer timeout for operations like curl that take more time.
"""
output = b""
deadline = time.monotonic() + (timeout if timeout is not None else self.timeout)
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
wait = min(deadline - now, quiet_period)
ready, _, _ = select.select([self._fd], [], [], wait)
if ready:
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()