The Redfish API at 127.0.0.1 is only accessible from within the IOM's
own shell, not directly from the host over the serial cable. The previous
approach of making urllib HTTPS requests from the host to 127.0.0.1 was
fundamentally incorrect.
Changes:
- serial_port.py: add optional per-call timeout override to
read_until_quiet() so curl responses have enough time to arrive
- workflow_serial.py:
- add _login_serial_console() — sends username/password over serial
and waits for a shell prompt before proceeding
- add _serial_redfish_request() — builds and sends a curl command
over the serial session, parses HTTP status and JSON from the output
- fetch_current_config(), apply_configuration(), _apply_iom() now
accept and use a SerialPort instance via _serial_redfish_request()
- configure_shelf() calls _login_serial_console() after collecting
the password, before making any Redfish calls
- remove unused _redfish_request import (HTTP transport no longer
used in the serial workflow)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
127 lines
4.6 KiB
Python
127 lines
4.6 KiB
Python
"""
|
|
serial_port.py — Minimal 8N1 serial port using only the Python standard library.
|
|
Replaces pyserial for TrueNAS environments where pip is unavailable.
|
|
"""
|
|
|
|
import fcntl
|
|
import os
|
|
import select
|
|
import termios
|
|
import time
|
|
from typing import Optional
|
|
|
|
|
|
class SerialPort:
|
|
BAUD_MAP = {
|
|
9600: termios.B9600,
|
|
19200: termios.B19200,
|
|
38400: termios.B38400,
|
|
57600: termios.B57600,
|
|
115200: termios.B115200,
|
|
}
|
|
|
|
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self._fd: Optional[int] = None
|
|
self._saved_attrs = None
|
|
|
|
# ── Open / close ──────────────────────────────────────────────────────────
|
|
def open(self):
|
|
try:
|
|
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
|
|
except OSError as e:
|
|
raise OSError(f"Cannot open {self.port}: {e}") from e
|
|
|
|
# Switch back to blocking mode now that O_NOCTTY is set
|
|
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
|
|
|
self._saved_attrs = termios.tcgetattr(self._fd)
|
|
|
|
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
|
|
attrs = list(termios.tcgetattr(self._fd))
|
|
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
|
|
|
|
attrs[0] = termios.IGNBRK # iflag
|
|
attrs[1] = 0 # oflag
|
|
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
|
|
attrs[3] = 0 # lflag (raw)
|
|
attrs[4] = baud # ispeed
|
|
attrs[5] = baud # ospeed
|
|
attrs[6][termios.VMIN] = 0
|
|
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
|
|
|
|
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
|
|
termios.tcflush(self._fd, termios.TCIOFLUSH)
|
|
|
|
def close(self):
|
|
if self._fd is not None:
|
|
try:
|
|
if self._saved_attrs:
|
|
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
|
|
os.close(self._fd)
|
|
except OSError:
|
|
pass
|
|
finally:
|
|
self._fd = None
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._fd is not None
|
|
|
|
# ── Read / write ──────────────────────────────────────────────────────────
|
|
def write(self, data: bytes):
|
|
if self._fd is None:
|
|
raise OSError("Port is not open")
|
|
os.write(self._fd, data)
|
|
|
|
def read_chunk(self, size: int = 4096) -> bytes:
|
|
if self._fd is None:
|
|
raise OSError("Port is not open")
|
|
try:
|
|
return os.read(self._fd, size)
|
|
except OSError:
|
|
return b""
|
|
|
|
def read_until_quiet(self, quiet_period: float = 0.5,
|
|
timeout: Optional[float] = None) -> str:
|
|
"""
|
|
Read until no new bytes arrive for `quiet_period` seconds,
|
|
or until `timeout` (default: self.timeout) seconds have elapsed.
|
|
Pass a longer timeout for operations like curl that take more time.
|
|
"""
|
|
output = b""
|
|
deadline = time.monotonic() + (timeout if timeout is not None else self.timeout)
|
|
last_rx = time.monotonic()
|
|
|
|
while True:
|
|
now = time.monotonic()
|
|
if now >= deadline:
|
|
break
|
|
if output and (now - last_rx) >= quiet_period:
|
|
break
|
|
|
|
wait = min(deadline - now, quiet_period)
|
|
ready, _, _ = select.select([self._fd], [], [], wait)
|
|
if ready:
|
|
chunk = self.read_chunk()
|
|
if chunk:
|
|
output += chunk
|
|
last_rx = time.monotonic()
|
|
|
|
return output.decode("utf-8", errors="replace")
|
|
|
|
def send_line(self, cmd: str = "", delay: float = 0.3):
|
|
self.write((cmd + "\r\n").encode("utf-8"))
|
|
time.sleep(delay)
|
|
|
|
# ── Context manager ───────────────────────────────────────────────────────
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, *_):
|
|
self.close()
|