Files
es24n-conf/serial_port.py
scott ac2f67adad Split monolithic script into focused modules
Refactored the single 1200-line es24n_conf.py into six modules plus a
slim entry point, in preparation for the upcoming network-based config
workflow. Each file has a clear, single responsibility:

  ui.py              — ANSI colours, display primitives, input prompts
  serial_port.py     — SerialPort class (termios/fcntl/select)
  models.py          — IOMConfig and ShelfConfig dataclasses
  redfish.py         — Redfish API client (shared by all workflows)
  workflow_serial.py — Serial-based IOM network configuration workflow
  workflow_firmware.py — IOM and Fabric Card firmware update workflow
  es24n_conf.py      — Entry point and main menu only

No functional changes. All imports verified.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 17:48:51 -04:00

125 lines
4.4 KiB
Python

"""
serial_port.py — Minimal 8N1 serial port using only the Python standard library.
Replaces pyserial for TrueNAS environments where pip is unavailable.
"""
import fcntl
import os
import select
import termios
import time
from typing import Optional
class SerialPort:
BAUD_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._fd: Optional[int] = None
self._saved_attrs = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
# Switch back to blocking mode now that O_NOCTTY is set
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
self._saved_attrs = termios.tcgetattr(self._fd)
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
attrs = list(termios.tcgetattr(self._fd))
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
attrs[0] = termios.IGNBRK # iflag
attrs[1] = 0 # oflag
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
attrs[3] = 0 # lflag (raw)
attrs[4] = baud # ispeed
attrs[5] = baud # ospeed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
termios.tcflush(self._fd, termios.TCIOFLUSH)
def close(self):
if self._fd is not None:
try:
if self._saved_attrs:
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
os.close(self._fd)
except OSError:
pass
finally:
self._fd = None
@property
def is_open(self) -> bool:
return self._fd is not None
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._fd is None:
raise OSError("Port is not open")
os.write(self._fd, data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._fd is None:
raise OSError("Port is not open")
try:
return os.read(self._fd, size)
except OSError:
return b""
def read_until_quiet(self, quiet_period: float = 0.5) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `self.timeout` total seconds have elapsed.
"""
output = b""
deadline = time.monotonic() + self.timeout
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
wait = min(deadline - now, quiet_period)
ready, _, _ = select.select([self._fd], [], [], wait)
if ready:
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()