Files
es24n-conf-windows/modules/serial_port.py
scott 75873dd4e0 Port serial layer to pyserial; add Windows COM detection and PyInstaller spec
- serial_port.py: replace termios/fcntl/select with pyserial wrapper,
  same SerialPort interface preserved for all other modules
- workflow_serial.py: detect_serial_device() uses serial.tools.list_ports
  to enumerate COM ports; removes _fix_permissions() (not needed on Windows);
  multi-device table now shows port description
- es24n_conf.py: enable VT/ANSI colour mode on Windows 10+ via ctypes;
  update docstring for Windows edition
- requirements.txt: pyserial >= 3.5, pyinstaller >= 6.0
- es24n_conf.spec: PyInstaller single-file .exe spec

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 15:50:17 -04:00

102 lines
3.5 KiB
Python

"""
serial_port.py — pyserial-backed 8N1 serial port for the Windows edition.
Exposes the same SerialPort interface as the TrueNAS/Linux edition so all
other modules work without modification.
"""
import time
from typing import Optional
import serial
class SerialPort:
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._ser: Optional[serial.Serial] = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._ser = serial.Serial(
port = self.port,
baudrate = self.baudrate,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
timeout = 0, # non-blocking reads; we poll manually
xonxoff = False,
rtscts = False,
dsrdtr = False,
)
except serial.SerialException as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
def close(self):
if self._ser is not None:
try:
self._ser.close()
except Exception:
pass
finally:
self._ser = None
@property
def is_open(self) -> bool:
return self._ser is not None and self._ser.is_open
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._ser is None:
raise OSError("Port is not open")
self._ser.write(data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._ser is None:
raise OSError("Port is not open")
waiting = self._ser.in_waiting
if waiting:
return self._ser.read(min(size, waiting))
return b""
def read_until_quiet(self, quiet_period: float = 0.5,
timeout: Optional[float] = None) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `timeout` (default: self.timeout) seconds have elapsed.
Pass a longer timeout for operations like curl that take more time.
"""
output = b""
deadline = time.monotonic() + (timeout if timeout is not None else self.timeout)
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
else:
time.sleep(0.05)
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()