Files
es24n-conf/modules/serial_port.py
scott d80dac2222 Fix serial workflow: login to IOM console and run curl for Redfish
The Redfish API at 127.0.0.1 is only accessible from within the IOM's
own shell, not directly from the host over the serial cable. The previous
approach of making urllib HTTPS requests from the host to 127.0.0.1 was
fundamentally incorrect.

Changes:
- serial_port.py: add optional per-call timeout override to
  read_until_quiet() so curl responses have enough time to arrive
- workflow_serial.py:
  - add _login_serial_console() — sends username/password over serial
    and waits for a shell prompt before proceeding
  - add _serial_redfish_request() — builds and sends a curl command
    over the serial session, parses HTTP status and JSON from the output
  - fetch_current_config(), apply_configuration(), _apply_iom() now
    accept and use a SerialPort instance via _serial_redfish_request()
  - configure_shelf() calls _login_serial_console() after collecting
    the password, before making any Redfish calls
  - remove unused _redfish_request import (HTTP transport no longer
    used in the serial workflow)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 21:55:01 -04:00

127 lines
4.6 KiB
Python

"""
serial_port.py — Minimal 8N1 serial port using only the Python standard library.
Replaces pyserial for TrueNAS environments where pip is unavailable.
"""
import fcntl
import os
import select
import termios
import time
from typing import Optional
class SerialPort:
BAUD_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._fd: Optional[int] = None
self._saved_attrs = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
# Switch back to blocking mode now that O_NOCTTY is set
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
self._saved_attrs = termios.tcgetattr(self._fd)
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
attrs = list(termios.tcgetattr(self._fd))
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
attrs[0] = termios.IGNBRK # iflag
attrs[1] = 0 # oflag
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
attrs[3] = 0 # lflag (raw)
attrs[4] = baud # ispeed
attrs[5] = baud # ospeed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
termios.tcflush(self._fd, termios.TCIOFLUSH)
def close(self):
if self._fd is not None:
try:
if self._saved_attrs:
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
os.close(self._fd)
except OSError:
pass
finally:
self._fd = None
@property
def is_open(self) -> bool:
return self._fd is not None
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._fd is None:
raise OSError("Port is not open")
os.write(self._fd, data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._fd is None:
raise OSError("Port is not open")
try:
return os.read(self._fd, size)
except OSError:
return b""
def read_until_quiet(self, quiet_period: float = 0.5,
timeout: Optional[float] = None) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `timeout` (default: self.timeout) seconds have elapsed.
Pass a longer timeout for operations like curl that take more time.
"""
output = b""
deadline = time.monotonic() + (timeout if timeout is not None else self.timeout)
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
wait = min(deadline - now, quiet_period)
ready, _, _ = select.select([self._fd], [], [], wait)
if ready:
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()