diff --git a/es24n_conf.py b/es24n_conf.py index be0090a..4d1fc5c 100755 --- a/es24n_conf.py +++ b/es24n_conf.py @@ -6,1171 +6,28 @@ Based on ES24N Product Service Guide v.26011 Zero external dependencies — Python 3 standard library only. Compatible with TrueNAS (FreeBSD) and Linux. + +Usage: + python3 es24n_conf.py + +All source files must be present in the same directory: + ui.py, serial_port.py, models.py, redfish.py, + workflow_serial.py, workflow_firmware.py """ -import fcntl -import glob -import ipaddress -import json -import os -import select -import ssl -import subprocess import sys -import termios import time -import urllib.error -import urllib.request -from base64 import b64encode -from dataclasses import dataclass, field -from typing import Optional -# ── ANSI colour helpers ─────────────────────────────────────────────────────── -class C: - RED = "\033[0;31m" - GRN = "\033[0;32m" - YEL = "\033[1;33m" - CYN = "\033[0;36m" - WHT = "\033[1;37m" - DIM = "\033[2m" - BOLD = "\033[1m" - RESET = "\033[0m" - CLEAR = "\033[2J\033[H" +from ui import _c, C, banner, draw_box, ok, warn, prompt +from workflow_firmware import firmware_update_workflow +from workflow_serial import configure_shelf -def _c(colour: str, text: str) -> str: - return f"{colour}{text}{C.RESET}" -def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}") -def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}") -def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}") -def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}") - -def banner(): - print(C.CLEAR, end="") - w = 60 - print(_c(C.CYN, " +" + "-" * w + "+")) - print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD, - " TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |")) - print(_c(C.CYN, " |") + _c(C.DIM, - " Serial Config & Firmware Updates (stdlib only) ") + _c(C.CYN, " |")) - print(_c(C.CYN, " +" + "-" * w + "+")) - print() - -def rule(title: str = ""): - width = 60 - if title: - pad = max(0, width - len(title) - 2) - left = pad // 2 - right = pad - left - line = f"{'-' * left} {title} {'-' * right}" - else: - line = "-" * width - print(f"\n {_c(C.YEL, line)}\n") - -def draw_table(headers: list, rows: list, col_widths: list): - sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+" - - def fmt_row(cells): - return " | " + " | ".join( - str(c).ljust(w) for c, w in zip(cells, col_widths) - ) + " |" - - print(_c(C.DIM, sep)) - print(_c(C.BOLD, fmt_row(headers))) - print(_c(C.DIM, sep)) - for row in rows: - print(fmt_row(row)) - print(_c(C.DIM, sep)) - -def draw_box(lines: list, colour: str = C.CYN): - width = max(len(l) for l in lines) + 4 - print(f" {_c(colour, '+' + '-' * width + '+')}") - for line in lines: - pad = width - len(line) - 2 - print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}") - print(f" {_c(colour, '+' + '-' * width + '+')}") - - -# ── Input helpers ───────────────────────────────────────────────────────────── -def prompt(label: str, default: str = "") -> str: - display = f" {_c(C.CYN, label)}" - if default: - display += f" {_c(C.DIM, f'[{default}]')}" - display += ": " - - sys.stdout.write(display) - sys.stdout.flush() - val = sys.stdin.readline().strip() - return val if val else default - - -def prompt_ip(label: str) -> str: - while True: - val = prompt(label) - try: - ipaddress.IPv4Address(val) - return val - except ValueError: - warn(f"'{val}' is not a valid IPv4 address — please try again.") - - -def prompt_yn(label: str, default: bool = True) -> bool: - hint = "Y/n" if default else "y/N" - val = prompt(f"{label} [{hint}]").strip().lower() - if not val: - return default - return val in ("y", "yes") - - -def prompt_password() -> str: - while True: - val = prompt( - "Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)", - ) - if val: - return val - warn("Password cannot be empty.") - - -# ── Data classes ────────────────────────────────────────────────────────────── -@dataclass -class IOMConfig: - iom: str - dhcp: bool = True - ip: str = "" - gateway: str = "" - netmask: str = "" - -@dataclass -class ShelfConfig: - device: str = "" - password: str = "" - iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1")) - iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2")) - - -# ── Serial port (stdlib: termios / fcntl / select) ──────────────────────────── -class SerialPort: - """ - Minimal 8N1 serial port using only the Python standard library. - Replaces pyserial for TrueNAS environments where pip is unavailable. - """ - - BAUD_MAP = { - 9600: termios.B9600, - 19200: termios.B19200, - 38400: termios.B38400, - 57600: termios.B57600, - 115200: termios.B115200, - } - - def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): - self.port = port - self.baudrate = baudrate - self.timeout = timeout - self._fd: Optional[int] = None - self._saved_attrs = None - - # ── Open / close ────────────────────────────────────────────────────────── - def open(self): - try: - self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) - except OSError as e: - raise OSError(f"Cannot open {self.port}: {e}") from e - - # Switch back to blocking mode now that O_NOCTTY is set - flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) - fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) - - self._saved_attrs = termios.tcgetattr(self._fd) - - # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc - attrs = list(termios.tcgetattr(self._fd)) - baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) - - attrs[0] = termios.IGNBRK # iflag - attrs[1] = 0 # oflag - attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag - attrs[3] = 0 # lflag (raw) - attrs[4] = baud # ispeed - attrs[5] = baud # ospeed - attrs[6][termios.VMIN] = 0 - attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) - - termios.tcsetattr(self._fd, termios.TCSANOW, attrs) - termios.tcflush(self._fd, termios.TCIOFLUSH) - - def close(self): - if self._fd is not None: - try: - if self._saved_attrs: - termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) - os.close(self._fd) - except OSError: - pass - finally: - self._fd = None - - @property - def is_open(self) -> bool: - return self._fd is not None - - # ── Read / write ────────────────────────────────────────────────────────── - def write(self, data: bytes): - if self._fd is None: - raise OSError("Port is not open") - os.write(self._fd, data) - - def read_chunk(self, size: int = 4096) -> bytes: - if self._fd is None: - raise OSError("Port is not open") - try: - return os.read(self._fd, size) - except OSError: - return b"" - - def read_until_quiet(self, quiet_period: float = 0.5) -> str: - """ - Read until no new bytes arrive for `quiet_period` seconds, - or until `self.timeout` total seconds have elapsed. - """ - output = b"" - deadline = time.monotonic() + self.timeout - last_rx = time.monotonic() - - while True: - now = time.monotonic() - if now >= deadline: - break - if output and (now - last_rx) >= quiet_period: - break - - wait = min(deadline - now, quiet_period) - ready, _, _ = select.select([self._fd], [], [], wait) - if ready: - chunk = self.read_chunk() - if chunk: - output += chunk - last_rx = time.monotonic() - - return output.decode("utf-8", errors="replace") - - def send_line(self, cmd: str = "", delay: float = 0.3): - self.write((cmd + "\r\n").encode("utf-8")) - time.sleep(delay) - - # ── Context manager ─────────────────────────────────────────────────────── - def __enter__(self): - self.open() - return self - - def __exit__(self, *_): - self.close() - - -# ── Step 1: Detect serial device ────────────────────────────────────────────── -def detect_serial_device() -> Optional[str]: - rule("Step 1 of 5 -- Serial Cable & Device Detection") - - print(" Connect the serial cable from the ES24N IOM1 port") - print(" to the active F-Series controller USB port.") - print() - prompt("Press Enter when the cable is connected") - - for attempt in range(1, 4): - info(f"Scanning for USB serial devices (attempt {attempt}/3)...") - time.sleep(1) - - # FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM* - patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"] - ports = sorted({p for pat in patterns for p in glob.glob(pat)}) - - if ports: - break - - if attempt < 3: - warn("No device found yet — retrying in 2 seconds...") - time.sleep(2) - - if not ports: - error("No USB serial device detected after 3 attempts.") - print() - print(" Troubleshooting:") - print(" - Ensure the serial cable is fully seated at both ends.") - print(" - Try a different USB port on the controller.") - print(" - Confirm the ES24N is powered on.") - return None - - if len(ports) == 1: - ok(f"Device found: {_c(C.BOLD, ports[0])}") - _fix_permissions(ports[0]) - return ports[0] - - # Multiple devices — let the user choose - print() - draw_table( - ["#", "Device"], - [[str(i), p] for i, p in enumerate(ports, 1)], - [4, 24], - ) - print() - - while True: - val = prompt(f"Select device number [1-{len(ports)}]") - if val.isdigit() and 1 <= int(val) <= len(ports): - selected = ports[int(val) - 1] - ok(f"Selected: {_c(C.BOLD, selected)}") - _fix_permissions(selected) - return selected - warn(f"Please enter a number between 1 and {len(ports)}.") - - -def _fix_permissions(device: str): - try: - result = subprocess.run( - ["sudo", "chown", ":wheel", device], - capture_output=True, timeout=5, - ) - if result.returncode == 0: - ok(f"Permissions updated on {device}") - return - except Exception: - pass - try: - os.chmod(device, 0o666) - ok(f"Permissions updated on {device}") - except PermissionError: - warn("Could not update device permissions automatically.") - warn("If the connection fails, re-run this script with sudo.") - - -# ── Step 2: Open serial connection & wake IOM console ───────────────────────── -def open_serial_connection(device: str) -> Optional[SerialPort]: - rule("Step 2 of 5 -- Opening Serial Connection") - - info(f"Opening {device} at 115200 baud (8N1)...") - ser = SerialPort(device, baudrate=115200, timeout=5.0) - - try: - ser.open() - except OSError as e: - error(str(e)) - return None - - ok(f"Port opened: {device}") - info("Sending wake signal to IOM console...") - - ser.send_line("", delay=0.5) - ser.send_line("", delay=0.5) - response = ser.read_until_quiet(quiet_period=0.5) - - print() - if response.strip(): - print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}") - for line in response.strip().splitlines(): - print(f" {_c(C.DIM, '|')} {line}") - print(f" {_c(C.DIM, '+' + '-' * 56)}") - print() - - low = response.lower() - if any(kw in low for kw in ("login", "$", "#", "password")): - ok("IOM console is responsive.") - else: - warn("Unexpected response — the IOM may still be booting.") - warn("You can continue; the Redfish API operates independently.") - else: - warn("No response received from IOM console.") - warn("The Redfish API may still be reachable. Continuing...") - - print() - return ser - - -# ── Redfish helpers (GET and PATCH) ────────────────────────────────────────── -def _redfish_request(password: str, method: str, path: str, - payload: Optional[dict] = None, - host: str = "127.0.0.1") -> tuple: - """ - Issue a Redfish HTTP request. host defaults to the serial loopback (127.0.0.1) - but can be set to an IOM's network IP for firmware update operations. - Returns (success: bool, data: dict|str). - """ - url = f"https://{host}{path}" - username = "root" if host == "127.0.0.1" else "Admin" - credentials = b64encode(f"{username}:{password}".encode()).decode() - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - - body = json.dumps(payload).encode("utf-8") if payload else None - headers = {"Authorization": f"Basic {credentials}"} - if body: - headers["Content-Type"] = "application/json" - - req = urllib.request.Request(url, data=body, method=method, headers=headers) - - try: - with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: - raw = resp.read().decode("utf-8", errors="replace") - try: - return True, json.loads(raw) if raw.strip() else {} - except json.JSONDecodeError: - return True, {} - - except urllib.error.HTTPError as e: - raw = e.read().decode("utf-8", errors="replace") - try: - msg = json.loads(raw).get("error", {}).get("message", raw) - except json.JSONDecodeError: - msg = raw - return False, f"HTTP {e.code}: {msg[:120]}" - - except OSError as e: - return False, f"Connection error: {e}" - - -# ── Redfish helpers (firmware upload & update) ──────────────────────────────── -def _redfish_upload_firmware(password: str, host: str, fw_path: str) -> tuple: - """ - Upload a firmware file to /redfish/v1/UpdateService using multipart/form-data. - Equivalent to: curl -k -u Admin: https:///redfish/v1/UpdateService - -X POST -F "software=@" - """ - try: - with open(fw_path, "rb") as f: - file_data = f.read() - except OSError as e: - return False, f"Cannot read file: {e}" - - filename = os.path.basename(fw_path) - boundary = f"FormBoundary{int(time.time() * 1000)}" - - body = ( - f"--{boundary}\r\n" - f'Content-Disposition: form-data; name="software"; filename="{filename}"\r\n' - "Content-Type: application/octet-stream\r\n" - "\r\n" - ).encode() + file_data + f"\r\n--{boundary}--\r\n".encode() - - url = f"https://{host}/redfish/v1/UpdateService" - credentials = b64encode(f"Admin:{password}".encode()).decode() # always network - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - - req = urllib.request.Request( - url, data=body, method="POST", - headers={ - "Authorization": f"Basic {credentials}", - "Content-Type": f"multipart/form-data; boundary={boundary}", - }, - ) - try: - with urllib.request.urlopen(req, context=ctx, timeout=120) as resp: - raw = resp.read().decode("utf-8", errors="replace") - try: - return True, json.loads(raw) if raw.strip() else {} - except json.JSONDecodeError: - return True, {} - except urllib.error.HTTPError as e: - raw = e.read().decode("utf-8", errors="replace") - try: - msg = json.loads(raw).get("error", {}).get("message", raw) - except json.JSONDecodeError: - msg = raw - return False, f"HTTP {e.code}: {msg[:120]}" - except OSError as e: - return False, f"Connection error: {e}" - - -def _redfish_trigger_update(password: str, host: str, target: str) -> tuple: - """ - Trigger a Redfish SimpleUpdate for the given target resource path. - target: e.g. "/redfish/v1/Managers/IOM1" - or "/redfish/v1/Chassis/IOM1/NetworkAdapters/1" - """ - return _redfish_request( - password, "POST", - "/redfish/v1/UpdateService/Actions/SimpleUpdate", - payload={ - "ImageURI": "/redfish/v1/UpdateService/software", - "Targets": [target], - }, - host=host, - ) - - -def _redfish_poll_tasks(password: str, host: str, timeout: int = 600) -> tuple: - """ - Poll /redfish/v1/TaskService/Tasks/ until all tasks reach a terminal state - or timeout is exceeded. Returns (success: bool, message: str). - """ - TERMINAL = {"Completed", "Killed", "Exception"} - deadline = time.monotonic() + timeout - elapsed = 0 - - while time.monotonic() < deadline: - ok_flag, data = _redfish_request( - password, "GET", "/redfish/v1/TaskService/Tasks/", host=host, - ) - if not ok_flag: - return False, f"Task service error: {data}" - - members = data.get("Members", []) - if not members: - return True, "No pending tasks." - - running = [] - for member in members: - state = member.get("TaskState") - if state is None: - # Resolve individual task link - task_path = member.get("@odata.id", "") - if task_path: - t_ok, t_data = _redfish_request( - password, "GET", task_path, host=host, - ) - state = (t_data.get("TaskState") - if t_ok and isinstance(t_data, dict) else "Running") - else: - state = "Running" - if state not in TERMINAL: - running.append(state) - - if not running: - return True, "All tasks completed." - - info(f" Tasks running ({', '.join(running)})... [{elapsed}s elapsed]") - time.sleep(10) - elapsed += 10 - - return False, f"Timeout after {timeout}s waiting for tasks." - - -def _redfish_restart_iom(password: str, host: str, iom: str) -> tuple: - return _redfish_request( - password, "POST", - f"/redfish/v1/Managers/{iom}/Actions/Manager.Reset", - payload={"ResetType": "GracefulRestart"}, - host=host, - ) - - -def _redfish_reset_fabric(password: str, host: str, iom: str) -> tuple: - return _redfish_request( - password, "POST", - f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1/Actions/NetworkAdapter.Reset", - payload={"ResetType": "GracefulRestart"}, - host=host, - ) - - -def _get_iom_fw_version(password: str, host: str, iom: str) -> str: - ok_flag, data = _redfish_request( - password, "GET", f"/redfish/v1/Managers/{iom}", host=host, - ) - if ok_flag and isinstance(data, dict): - return data.get("FirmwareVersion", "Unknown") - return _c(C.RED, "Unreachable") - - -def _get_fabric_fw_version(password: str, host: str, iom: str) -> str: - ok_flag, data = _redfish_request( - password, "GET", - f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", - host=host, - ) - if ok_flag and isinstance(data, dict): - version = (data.get("Oem", {}) - .get("Version", {}) - .get("ActiveFirmwareVersion")) - return version or "Unknown" - return _c(C.RED, "Unreachable") - - -def _show_fw_versions(password: str, ioms: list): - info("Querying firmware versions...") - rows = [] - for iom, ip in ioms: - iom_ver = _get_iom_fw_version(password, ip, iom) - fabric_ver = _get_fabric_fw_version(password, ip, iom) - rows.append([iom, ip, iom_ver, fabric_ver]) - print() - draw_table( - ["IOM", "IP Address", "IOM Firmware", "Fabric Firmware"], - rows, - [5, 16, 32, 20], - ) - print() - - -def _update_iom_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: - """Upload and apply IOM firmware for one IOM, then restart it.""" - sz = os.path.getsize(fw_path) - info(f"Uploading IOM firmware ({sz // 1024} KB) to {iom} at {ip}...") - ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) - if not ok_flag: - error(f"Upload failed: {data}") - return False - ok("Firmware file uploaded.") - - info(f"Triggering {iom} firmware update...") - ok_flag, data = _redfish_trigger_update( - password, ip, f"/redfish/v1/Managers/{iom}", - ) - if not ok_flag: - error(f"Update trigger failed: {data}") - return False - ok("Update triggered.") - - info("Monitoring update progress (this may take several minutes)...") - ok_flag, msg = _redfish_poll_tasks(password, ip) - if not ok_flag: - warn(f"Task monitoring ended: {msg}") - else: - ok(msg) - - info(f"Restarting {iom}...") - _redfish_restart_iom(password, ip, iom) # connection drop on restart is normal - ok(f"{iom} restart initiated. Waiting 30s for reboot...") - time.sleep(30) - return True - - -def _update_fabric_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: - """ - Upload and apply Fabric Card firmware for one IOM. - Per the service guide, the firmware file must be re-uploaded even if it was - already uploaded during the IOM firmware step. - After the update: restart fabric card, then restart IOM. - """ - sz = os.path.getsize(fw_path) - info(f"Uploading Fabric Card firmware ({sz // 1024} KB) to {iom} at {ip}...") - ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) - if not ok_flag: - error(f"Upload failed: {data}") - return False - ok("Firmware file uploaded.") - - info(f"Triggering {iom} Fabric Card firmware update...") - ok_flag, data = _redfish_trigger_update( - password, ip, f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", - ) - if not ok_flag: - error(f"Update trigger failed: {data}") - return False - ok("Update triggered.") - - info("Monitoring update progress...") - ok_flag, msg = _redfish_poll_tasks(password, ip) - if not ok_flag: - warn(f"Task monitoring ended: {msg}") - else: - ok(msg) - - info(f"Restarting {iom} Fabric Card...") - _redfish_reset_fabric(password, ip, iom) - ok("Fabric Card restart initiated. Waiting 15s...") - time.sleep(15) - - info(f"Restarting {iom} after Fabric Card update...") - _redfish_restart_iom(password, ip, iom) - ok(f"{iom} restart initiated. Waiting 30s for reboot...") - time.sleep(30) - return True - - -# ── Firmware file selection helper ──────────────────────────────────────────── -def _prompt_fw_file(label: str) -> str: - """ - Scan the current working directory for firmware files and let the user - pick one by number, or enter a custom path as the last option. - Files are sorted most-recently-modified first. - """ - cwd = os.getcwd() - FW_EXTS = {".bin", ".img", ".fw", ".hex", ".zip", ".tar", ".tgz", ".gz"} - - try: - candidates = sorted( - [f for f in os.listdir(cwd) - if not f.startswith(".") - and os.path.isfile(os.path.join(cwd, f)) - and os.path.splitext(f)[1].lower() in FW_EXTS], - key=lambda f: os.path.getmtime(os.path.join(cwd, f)), - reverse=True, - ) - except OSError: - candidates = [] - - print() - if candidates: - info(f"Firmware files found in {cwd}:") - for i, fname in enumerate(candidates, 1): - sz = os.path.getsize(os.path.join(cwd, fname)) - print(f" {_c(C.BOLD, str(i))} {fname} {_c(C.DIM, f'({sz // 1024} KB)')}") - custom_idx = len(candidates) + 1 - print(f" {_c(C.BOLD, str(custom_idx))} Enter a custom file path") - print() - - while True: - choice = prompt(f"Select {label} [1-{custom_idx}]") - if choice.isdigit(): - idx = int(choice) - if 1 <= idx <= len(candidates): - path = os.path.join(cwd, candidates[idx - 1]) - sz = os.path.getsize(path) - ok(f"Selected: {candidates[idx - 1]} ({sz // 1024} KB)") - return path - if idx == custom_idx: - break - warn(f"Please enter a number between 1 and {custom_idx}.") - else: - info(f"No firmware files found in {cwd}.") - - # Manual path entry - while True: - path = prompt(f"Path to {label}") - if os.path.isfile(path): - sz = os.path.getsize(path) - ok(f"File: {path} ({sz // 1024} KB)") - return path - warn(f"File not found: {path}") - - -# ── Firmware Update Workflow ────────────────────────────────────────────────── -def firmware_update_workflow(): - """ - Standalone firmware update for IOM and Fabric Card firmware. - Connects to each IOM via its network IP (not serial loopback) — uploading - firmware over 115200-baud serial would be impractically slow. - """ - banner() - rule("IOM & Fabric Card Firmware Update") - info("This procedure connects to each IOM via its network IP address.") - info("Ensure this system has network access to the IOM management interface.") - print() - - password = prompt_password() - print() - - print(" Which IOM(s) would you like to update?") - print(f" {_c(C.BOLD, '1')} IOM1 only") - print(f" {_c(C.BOLD, '2')} IOM2 only") - print(f" {_c(C.BOLD, '3')} Both IOM1 and IOM2") - print() - - while True: - iom_choice = prompt("Select option [1-3]") - if iom_choice in ("1", "2", "3"): - break - warn("Please enter 1, 2, or 3.") - print() - - info("Enter the management IP address for each IOM to update.") - iom1_ip = prompt_ip(" IOM1 IP address") if iom_choice in ("1", "3") else "" - iom2_ip = prompt_ip(" IOM2 IP address") if iom_choice in ("2", "3") else "" - print() - - ioms = [] - if iom_choice in ("1", "3"): - ioms.append(("IOM1", iom1_ip)) - if iom_choice in ("2", "3"): - ioms.append(("IOM2", iom2_ip)) - - rule("Current Firmware Versions") - _show_fw_versions(password, ioms) - - print(" What would you like to update?") - print(f" {_c(C.BOLD, '1')} IOM Firmware only") - print(f" {_c(C.BOLD, '2')} Fabric Card Firmware only") - print(f" {_c(C.BOLD, '3')} Both IOM and Fabric Card Firmware") - print(f" {_c(C.BOLD, '4')} Cancel") - print() - - while True: - choice = prompt("Select option [1-4]") - if choice in ("1", "2", "3", "4"): - break - warn("Please enter 1, 2, 3, or 4.") - - if choice == "4": - info("Firmware update cancelled.") - return - - update_iom = choice in ("1", "3") - update_fabric = choice in ("2", "3") - iom_fw_path = "" - fabric_fw_path = "" - - if update_iom: - iom_fw_path = _prompt_fw_file("IOM firmware file") - - if update_fabric: - fabric_fw_path = _prompt_fw_file("Fabric Card firmware file") - - print() - warn("For HA systems: update the passive IOM first.") - if len(ioms) > 1: - warn("IOM1 will be updated first — adjust order if IOM2 is passive.") - print() - - if not prompt_yn("Proceed with firmware update?", default=True): - info("Firmware update cancelled.") - return - - for iom, ip in ioms: - rule(f"{iom} ({ip})") - if update_iom: - _update_iom_fw(password, ip, iom, iom_fw_path) - if update_fabric: - _update_fabric_fw(password, ip, iom, fabric_fw_path) - - rule("Post-Update Firmware Validation") - _show_fw_versions(password, ioms) - - print() - draw_box([ - f"{_c(C.YEL, 'IMPORTANT -- For HA (Dual-Controller) Systems:')}", - "", - "After updating this controller's IOMs:", - " 1. Log into TrueNAS and initiate a failover.", - " 2. Re-run this tool to update the other controller.", - ], colour=C.YEL) - print() - - -# ── Step 3: Fetch & display current IOM network settings ───────────────────── -def fetch_current_config(cfg: ShelfConfig) -> bool: - """ - Query Redfish for the current network config of both IOMs. - Populates cfg.iom1 / cfg.iom2 with live data. - Returns True if at least one IOM responded. - """ - rule("Step 3 of 5 -- Current IOM Network Settings") - info("Querying Redfish API for current network configuration...") - print() - - any_ok = False - rows = [] - errors = [] - - for iom in ("IOM1", "IOM2"): - path = f"/redfish/v1/Managers/{iom}/EthernetInterfaces/1" - ok_flag, data = _redfish_request(cfg.password, "GET", path) - - if ok_flag and isinstance(data, dict): - any_ok = True - - # Determine mode - dhcp_enabled = ( - data.get("DHCPv4", {}).get("DHCPEnabled", False) or - data.get("DHCPv6", {}).get("DHCPEnabled", False) - ) - - # Pull address info — prefer StaticAddresses, fall back to IPv4Addresses - addrs = data.get("IPv4StaticAddresses") or data.get("IPv4Addresses", []) - if addrs: - addr_rec = addrs[0] - ip = addr_rec.get("Address", "--") - gateway = addr_rec.get("Gateway", "--") - netmask = addr_rec.get("SubnetMask", "--") - else: - ip = gateway = netmask = "--" - - origin = data.get("IPv4Addresses", [{}])[0].get("AddressOrigin", "Unknown") \ - if data.get("IPv4Addresses") else ("DHCP" if dhcp_enabled else "Static") - - iom_cfg = IOMConfig( - iom = iom, - dhcp = dhcp_enabled, - ip = ip if ip != "--" else "", - gateway = gateway if gateway != "--" else "", - netmask = netmask if netmask != "--" else "", - ) - if iom == "IOM1": - cfg.iom1 = iom_cfg - else: - cfg.iom2 = iom_cfg - - mode_str = f"{_c(C.CYN, 'DHCP')}" if dhcp_enabled else f"{_c(C.GRN, 'Static')}" - rows.append([iom, mode_str, origin, ip, gateway, netmask]) - else: - rows.append([iom, _c(C.RED, "No response"), "--", "--", "--", "--"]) - errors.append((iom, str(data))) - - draw_table( - ["IOM", "Mode", "Origin", "IP Address", "Gateway", "Subnet Mask"], - rows, - [5, 10, 8, 16, 16, 16], - ) - print() - - if errors: - for iom, err in errors: - error(f"{iom} query failed: {err}") - print() - - if not any_ok: - error("Neither IOM responded to the Redfish query.") - error("Check that the serial cable is connected and the IOM is booted.") - - return any_ok - - -# ── Step 4: Prompt user — change config or exit ─────────────────────────────── -def collect_network_config(cfg: ShelfConfig) -> bool: - """ - Show current settings, ask user what to do. - Returns True to proceed with applying changes, False to skip. - """ - rule("Step 4 of 5 -- Change Configuration?") - - print(f" {_c(C.BOLD, '1')} Change network configuration") - print(f" {_c(C.BOLD, '2')} Leave settings as-is and disconnect") - print() - - while True: - choice = prompt("Select option [1/2]") - if choice in ("1", "2"): - break - warn("Please enter 1 or 2.") - - if choice == "2": - info("No changes requested.") - return False - - # ── User wants to change settings ───────────────────────────────────────── - print() - print(" How should the IOMs be configured?") - print(f" {_c(C.BOLD, '1')} Static IP addresses") - print(f" {_c(C.BOLD, '2')} DHCP") - print() - - while True: - mode = prompt("Select mode [1/2]") - if mode in ("1", "2"): - break - warn("Please enter 1 or 2.") - - use_dhcp = (mode == "2") - print() - - if use_dhcp: - cfg.iom1 = IOMConfig("IOM1", dhcp=True) - cfg.iom2 = IOMConfig("IOM2", dhcp=True) - ok("Both IOMs will be set to DHCP.") - return True - - # Static — IOM1 - info(f"Static network details for {_c(C.BOLD, 'IOM1')}:") - iom1_ip = prompt_ip(" IOM1 IP address ") - iom1_gw = prompt_ip(" IOM1 Gateway ") - iom1_nm = prompt_ip(" IOM1 Subnet Mask") - cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm) - - # Static — IOM2 - print() - info(f"Static network details for {_c(C.BOLD, 'IOM2')}:") - iom2_ip = prompt_ip(" IOM2 IP address ") - - same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True) - if same: - iom2_gw, iom2_nm = iom1_gw, iom1_nm - else: - iom2_gw = prompt_ip(" IOM2 Gateway ") - iom2_nm = prompt_ip(" IOM2 Subnet Mask") - - cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm) - return True - - -# ── Step 5a: Apply configuration via Redfish ────────────────────────────────── -def apply_configuration(cfg: ShelfConfig) -> bool: - rule("Step 5 of 5 -- Applying Configuration via Redfish API") - - info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...") - print() - - results = [] - all_ok = True - for iom_cfg in [cfg.iom1, cfg.iom2]: - success, detail = _apply_iom(cfg.password, iom_cfg) - status = _c(C.GRN, "OK") if success else _c(C.RED, "FAIL") - results.append([iom_cfg.iom, status, detail]) - if not success: - all_ok = False - - draw_table(["IOM", "Result", "Detail"], results, [6, 8, 50]) - print() - return all_ok - - -def _apply_iom(password: str, iom_cfg: IOMConfig) -> tuple: - """ - Apply network config to a single IOM. - - DHCP: single PATCH enabling DHCPv4. - - Static: two sequential PATCHes to work around a firmware bug in the - current ES24N release that prevents disabling DHCP and setting a static - address in the same request. - Pass 1 -- set the static IP/gateway/netmask (DHCP still on) - Pass 2 -- disable DHCP (address is already committed) - """ - path = f"/redfish/v1/Managers/{iom_cfg.iom}/EthernetInterfaces/1" - - if iom_cfg.dhcp: - # DHCP -- single call, no firmware quirk involved - ok_flag, data = _redfish_request( - password, "PATCH", path, - {"DHCPv4": {"DHCPEnabled": True}}, - ) - if ok_flag: - return True, "Configured: DHCP" - return False, str(data)[:80] - - # Static -- Pass 1: set address while DHCP is still enabled - info(f" {iom_cfg.iom} pass 1/2 -- setting static address {iom_cfg.ip}...") - ok_flag, data = _redfish_request( - password, "PATCH", path, - { - "IPv4StaticAddresses": [{ - "Address": iom_cfg.ip, - "Gateway": iom_cfg.gateway, - "SubnetMask": iom_cfg.netmask, - }] - }, - ) - if not ok_flag: - return False, f"Pass 1 failed: {str(data)[:70]}" - - # Brief pause to allow the IOM to commit the address before the next call - time.sleep(1) - - # Static -- Pass 2: disable DHCP now that the static address is committed - info(f" {iom_cfg.iom} pass 2/2 -- disabling DHCP...") - ok_flag, data = _redfish_request( - password, "PATCH", path, - {"DHCPv4": {"DHCPEnabled": False}}, - ) - if not ok_flag: - return False, f"Pass 2 failed: {str(data)[:70]}" - - return True, f"Configured: Static {iom_cfg.ip}" - - -# ── Step 5b: Print applied-settings summary ─────────────────────────────────── -def print_summary(cfg: ShelfConfig, changed: bool): - rule("Summary") - - def val(iom: IOMConfig, field: str) -> str: - dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"} - stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask} - return (dhcp_map if iom.dhcp else stat_map).get(field, "") - - draw_table( - ["Setting", "IOM1", "IOM2"], - [ - ["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")], - ["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")], - ["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")], - ["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")], - ["Serial Port", cfg.device, cfg.device], - ["Changes", "Yes" if changed else "None", ""], - ], - [12, 22, 22], - ) - - if changed: - print() - draw_box([ - f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}", - "", - "Remove the serial cable ONLY after verifying each", - "expander appears in TrueNAS with matching drives.", - "", - "TrueNAS > System Settings > Enclosure >", - "NVMe-oF Expansion Shelves", - ], colour=C.YEL) - print() - - -# ── Disconnect ──────────────────────────────────────────────────────────────── -def close_serial_connection(ser: SerialPort, device: str): - if ser and ser.is_open: - ser.close() - ok(f"Serial port {device} closed.") - - print() - prompt("Disconnect the serial cable, then press Enter to continue") - ok("Serial cable disconnected. Shelf complete.") - - -# ── Full shelf configuration cycle ──────────────────────────────────────────── -def configure_shelf() -> bool: - """Run one complete shelf cycle. Returns True if user wants another shelf.""" - banner() - cfg = ShelfConfig() - - # 1 — Detect device - device = detect_serial_device() - if not device: - error("Could not detect a serial device. Returning to main menu.") - time.sleep(2) - return True - cfg.device = device - - # 2 — Open serial port & wake IOM console - ser = open_serial_connection(device) - if not ser: - error("Could not open serial port. Returning to main menu.") - time.sleep(2) - return True - - # Password needed before any Redfish calls - print() - cfg.password = prompt_password() - - # 3 — Fetch & display current settings - fetch_current_config(cfg) - - # 4 — Ask user: change or leave alone? - apply_changes = collect_network_config(cfg) - - # 5 — Apply if requested - changed = False - if apply_changes: - print() - rule("Ready to Apply") - info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1") - print() - if prompt_yn("Apply configuration now?", default=True): - apply_configuration(cfg) - changed = True - else: - warn("Configuration skipped — no changes were made.") - - # Summary - print_summary(cfg, changed) - - # Disconnect - close_serial_connection(ser, device) - - print() - return prompt_yn("Configure another ES24N shelf?", default=False) - - -# ── Entry point ─────────────────────────────────────────────────────────────── def main(): - banner() - print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration and firmware updates')}") - print(f" {_c(C.DIM, 'using the Redfish API. No external dependencies.')}") - print() - while True: banner() draw_box([ - f" {_c(C.BOLD, '1')} Configure a new ES24N shelf", + f" {_c(C.BOLD, '1')} Configure a new ES24N shelf (serial)", f" {_c(C.BOLD, '2')} Update IOM / Fabric Card Firmware", f" {_c(C.BOLD, '3')} Exit", ]) @@ -1200,4 +57,4 @@ if __name__ == "__main__": except KeyboardInterrupt: print() warn("Interrupted. Exiting.") - sys.exit(0) \ No newline at end of file + sys.exit(0) diff --git a/models.py b/models.py new file mode 100644 index 0000000..5a74355 --- /dev/null +++ b/models.py @@ -0,0 +1,23 @@ +""" +models.py — Shared data classes for ES24N IOM configuration. +Used by both the serial and network configuration workflows. +""" + +from dataclasses import dataclass, field + + +@dataclass +class IOMConfig: + iom: str + dhcp: bool = True + ip: str = "" + gateway: str = "" + netmask: str = "" + + +@dataclass +class ShelfConfig: + device: str = "" + password: str = "" + iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1")) + iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2")) diff --git a/redfish.py b/redfish.py new file mode 100644 index 0000000..7a3f3e8 --- /dev/null +++ b/redfish.py @@ -0,0 +1,230 @@ +""" +redfish.py — Redfish API client functions shared across all ES24N workflows. +Handles GET/PATCH requests, firmware upload, task polling, and version queries. +""" + +import json +import os +import ssl +import time +import urllib.error +import urllib.request +from base64 import b64encode +from typing import Optional + +from ui import _c, C, info, draw_table + + +def _redfish_request(password: str, method: str, path: str, + payload: Optional[dict] = None, + host: str = "127.0.0.1") -> tuple: + """ + Issue a Redfish HTTP request. host defaults to the serial loopback (127.0.0.1) + but can be set to an IOM's network IP for firmware update operations. + Returns (success: bool, data: dict|str). + """ + url = f"https://{host}{path}" + username = "root" if host == "127.0.0.1" else "Admin" + credentials = b64encode(f"{username}:{password}".encode()).decode() + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + body = json.dumps(payload).encode("utf-8") if payload else None + headers = {"Authorization": f"Basic {credentials}"} + if body: + headers["Content-Type"] = "application/json" + + req = urllib.request.Request(url, data=body, method=method, headers=headers) + + try: + with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: + raw = resp.read().decode("utf-8", errors="replace") + try: + return True, json.loads(raw) if raw.strip() else {} + except json.JSONDecodeError: + return True, {} + + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8", errors="replace") + try: + msg = json.loads(raw).get("error", {}).get("message", raw) + except json.JSONDecodeError: + msg = raw + return False, f"HTTP {e.code}: {msg[:120]}" + + except OSError as e: + return False, f"Connection error: {e}" + + +def _redfish_upload_firmware(password: str, host: str, fw_path: str) -> tuple: + """ + Upload a firmware file to /redfish/v1/UpdateService using multipart/form-data. + Equivalent to: curl -k -u Admin: https:///redfish/v1/UpdateService + -X POST -F "software=@" + """ + try: + with open(fw_path, "rb") as f: + file_data = f.read() + except OSError as e: + return False, f"Cannot read file: {e}" + + filename = os.path.basename(fw_path) + boundary = f"FormBoundary{int(time.time() * 1000)}" + + body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="software"; filename="{filename}"\r\n' + "Content-Type: application/octet-stream\r\n" + "\r\n" + ).encode() + file_data + f"\r\n--{boundary}--\r\n".encode() + + url = f"https://{host}/redfish/v1/UpdateService" + credentials = b64encode(f"Admin:{password}".encode()).decode() # always network + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + req = urllib.request.Request( + url, data=body, method="POST", + headers={ + "Authorization": f"Basic {credentials}", + "Content-Type": f"multipart/form-data; boundary={boundary}", + }, + ) + try: + with urllib.request.urlopen(req, context=ctx, timeout=120) as resp: + raw = resp.read().decode("utf-8", errors="replace") + try: + return True, json.loads(raw) if raw.strip() else {} + except json.JSONDecodeError: + return True, {} + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8", errors="replace") + try: + msg = json.loads(raw).get("error", {}).get("message", raw) + except json.JSONDecodeError: + msg = raw + return False, f"HTTP {e.code}: {msg[:120]}" + except OSError as e: + return False, f"Connection error: {e}" + + +def _redfish_trigger_update(password: str, host: str, target: str) -> tuple: + """ + Trigger a Redfish SimpleUpdate for the given target resource path. + target: e.g. "/redfish/v1/Managers/IOM1" + or "/redfish/v1/Chassis/IOM1/NetworkAdapters/1" + """ + return _redfish_request( + password, "POST", + "/redfish/v1/UpdateService/Actions/SimpleUpdate", + payload={ + "ImageURI": "/redfish/v1/UpdateService/software", + "Targets": [target], + }, + host=host, + ) + + +def _redfish_poll_tasks(password: str, host: str, timeout: int = 600) -> tuple: + """ + Poll /redfish/v1/TaskService/Tasks/ until all tasks reach a terminal state + or timeout is exceeded. Returns (success: bool, message: str). + """ + TERMINAL = {"Completed", "Killed", "Exception"} + deadline = time.monotonic() + timeout + elapsed = 0 + + while time.monotonic() < deadline: + ok_flag, data = _redfish_request( + password, "GET", "/redfish/v1/TaskService/Tasks/", host=host, + ) + if not ok_flag: + return False, f"Task service error: {data}" + + members = data.get("Members", []) + if not members: + return True, "No pending tasks." + + running = [] + for member in members: + state = member.get("TaskState") + if state is None: + task_path = member.get("@odata.id", "") + if task_path: + t_ok, t_data = _redfish_request( + password, "GET", task_path, host=host, + ) + state = (t_data.get("TaskState") + if t_ok and isinstance(t_data, dict) else "Running") + else: + state = "Running" + if state not in TERMINAL: + running.append(state) + + if not running: + return True, "All tasks completed." + + info(f" Tasks running ({', '.join(running)})... [{elapsed}s elapsed]") + time.sleep(10) + elapsed += 10 + + return False, f"Timeout after {timeout}s waiting for tasks." + + +def _redfish_restart_iom(password: str, host: str, iom: str) -> tuple: + return _redfish_request( + password, "POST", + f"/redfish/v1/Managers/{iom}/Actions/Manager.Reset", + payload={"ResetType": "GracefulRestart"}, + host=host, + ) + + +def _redfish_reset_fabric(password: str, host: str, iom: str) -> tuple: + return _redfish_request( + password, "POST", + f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1/Actions/NetworkAdapter.Reset", + payload={"ResetType": "GracefulRestart"}, + host=host, + ) + + +def _get_iom_fw_version(password: str, host: str, iom: str) -> str: + ok_flag, data = _redfish_request( + password, "GET", f"/redfish/v1/Managers/{iom}", host=host, + ) + if ok_flag and isinstance(data, dict): + return data.get("FirmwareVersion", "Unknown") + return _c(C.RED, "Unreachable") + + +def _get_fabric_fw_version(password: str, host: str, iom: str) -> str: + ok_flag, data = _redfish_request( + password, "GET", + f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", + host=host, + ) + if ok_flag and isinstance(data, dict): + version = (data.get("Oem", {}) + .get("Version", {}) + .get("ActiveFirmwareVersion")) + return version or "Unknown" + return _c(C.RED, "Unreachable") + + +def _show_fw_versions(password: str, ioms: list): + info("Querying firmware versions...") + rows = [] + for iom, ip in ioms: + iom_ver = _get_iom_fw_version(password, ip, iom) + fabric_ver = _get_fabric_fw_version(password, ip, iom) + rows.append([iom, ip, iom_ver, fabric_ver]) + print() + draw_table( + ["IOM", "IP Address", "IOM Firmware", "Fabric Firmware"], + rows, + [5, 16, 32, 20], + ) + print() diff --git a/serial_port.py b/serial_port.py new file mode 100644 index 0000000..f3b071d --- /dev/null +++ b/serial_port.py @@ -0,0 +1,124 @@ +""" +serial_port.py — Minimal 8N1 serial port using only the Python standard library. +Replaces pyserial for TrueNAS environments where pip is unavailable. +""" + +import fcntl +import os +import select +import termios +import time +from typing import Optional + + +class SerialPort: + BAUD_MAP = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + } + + def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self._fd: Optional[int] = None + self._saved_attrs = None + + # ── Open / close ────────────────────────────────────────────────────────── + def open(self): + try: + self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + except OSError as e: + raise OSError(f"Cannot open {self.port}: {e}") from e + + # Switch back to blocking mode now that O_NOCTTY is set + flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) + fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) + + self._saved_attrs = termios.tcgetattr(self._fd) + + # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc + attrs = list(termios.tcgetattr(self._fd)) + baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) + + attrs[0] = termios.IGNBRK # iflag + attrs[1] = 0 # oflag + attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag + attrs[3] = 0 # lflag (raw) + attrs[4] = baud # ispeed + attrs[5] = baud # ospeed + attrs[6][termios.VMIN] = 0 + attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) + + termios.tcsetattr(self._fd, termios.TCSANOW, attrs) + termios.tcflush(self._fd, termios.TCIOFLUSH) + + def close(self): + if self._fd is not None: + try: + if self._saved_attrs: + termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) + os.close(self._fd) + except OSError: + pass + finally: + self._fd = None + + @property + def is_open(self) -> bool: + return self._fd is not None + + # ── Read / write ────────────────────────────────────────────────────────── + def write(self, data: bytes): + if self._fd is None: + raise OSError("Port is not open") + os.write(self._fd, data) + + def read_chunk(self, size: int = 4096) -> bytes: + if self._fd is None: + raise OSError("Port is not open") + try: + return os.read(self._fd, size) + except OSError: + return b"" + + def read_until_quiet(self, quiet_period: float = 0.5) -> str: + """ + Read until no new bytes arrive for `quiet_period` seconds, + or until `self.timeout` total seconds have elapsed. + """ + output = b"" + deadline = time.monotonic() + self.timeout + last_rx = time.monotonic() + + while True: + now = time.monotonic() + if now >= deadline: + break + if output and (now - last_rx) >= quiet_period: + break + + wait = min(deadline - now, quiet_period) + ready, _, _ = select.select([self._fd], [], [], wait) + if ready: + chunk = self.read_chunk() + if chunk: + output += chunk + last_rx = time.monotonic() + + return output.decode("utf-8", errors="replace") + + def send_line(self, cmd: str = "", delay: float = 0.3): + self.write((cmd + "\r\n").encode("utf-8")) + time.sleep(delay) + + # ── Context manager ─────────────────────────────────────────────────────── + def __enter__(self): + self.open() + return self + + def __exit__(self, *_): + self.close() diff --git a/ui.py b/ui.py new file mode 100644 index 0000000..fe6be2c --- /dev/null +++ b/ui.py @@ -0,0 +1,120 @@ +""" +ui.py — ANSI colour helpers, display primitives, and input prompts. +Shared by all ES24N workflows. +""" + +import ipaddress +import sys + + +# ── ANSI colour helpers ─────────────────────────────────────────────────────── +class C: + RED = "\033[0;31m" + GRN = "\033[0;32m" + YEL = "\033[1;33m" + CYN = "\033[0;36m" + WHT = "\033[1;37m" + DIM = "\033[2m" + BOLD = "\033[1m" + RESET = "\033[0m" + CLEAR = "\033[2J\033[H" + + +def _c(colour: str, text: str) -> str: + return f"{colour}{text}{C.RESET}" + + +def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}") +def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}") +def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}") +def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}") + + +def banner(): + print(C.CLEAR, end="") + w = 60 + print(_c(C.CYN, " +" + "-" * w + "+")) + print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD, + " TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |")) + print(_c(C.CYN, " |") + _c(C.DIM, + " Serial Config & Firmware Updates (stdlib only) ") + _c(C.CYN, " |")) + print(_c(C.CYN, " +" + "-" * w + "+")) + print() + + +def rule(title: str = ""): + width = 60 + if title: + pad = max(0, width - len(title) - 2) + left = pad // 2 + right = pad - left + line = f"{'-' * left} {title} {'-' * right}" + else: + line = "-" * width + print(f"\n {_c(C.YEL, line)}\n") + + +def draw_table(headers: list, rows: list, col_widths: list): + sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+" + + def fmt_row(cells): + return " | " + " | ".join( + str(c).ljust(w) for c, w in zip(cells, col_widths) + ) + " |" + + print(_c(C.DIM, sep)) + print(_c(C.BOLD, fmt_row(headers))) + print(_c(C.DIM, sep)) + for row in rows: + print(fmt_row(row)) + print(_c(C.DIM, sep)) + + +def draw_box(lines: list, colour: str = C.CYN): + width = max(len(l) for l in lines) + 4 + print(f" {_c(colour, '+' + '-' * width + '+')}") + for line in lines: + pad = width - len(line) - 2 + print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}") + print(f" {_c(colour, '+' + '-' * width + '+')}") + + +# ── Input helpers ───────────────────────────────────────────────────────────── +def prompt(label: str, default: str = "") -> str: + display = f" {_c(C.CYN, label)}" + if default: + display += f" {_c(C.DIM, f'[{default}]')}" + display += ": " + + sys.stdout.write(display) + sys.stdout.flush() + val = sys.stdin.readline().strip() + return val if val else default + + +def prompt_ip(label: str) -> str: + while True: + val = prompt(label) + try: + ipaddress.IPv4Address(val) + return val + except ValueError: + warn(f"'{val}' is not a valid IPv4 address — please try again.") + + +def prompt_yn(label: str, default: bool = True) -> bool: + hint = "Y/n" if default else "y/N" + val = prompt(f"{label} [{hint}]").strip().lower() + if not val: + return default + return val in ("y", "yes") + + +def prompt_password() -> str: + while True: + val = prompt( + "Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)", + ) + if val: + return val + warn("Password cannot be empty.") diff --git a/workflow_firmware.py b/workflow_firmware.py new file mode 100644 index 0000000..d4ae916 --- /dev/null +++ b/workflow_firmware.py @@ -0,0 +1,258 @@ +""" +workflow_firmware.py — IOM and Fabric Card firmware update workflow. +Connects to each IOM via its network IP address using the Redfish API. +""" + +import os +import time + +from redfish import ( + _redfish_upload_firmware, + _redfish_trigger_update, + _redfish_poll_tasks, + _redfish_restart_iom, + _redfish_reset_fabric, + _show_fw_versions, +) +from ui import ( + _c, C, + banner, rule, draw_box, + info, ok, warn, error, + prompt, prompt_ip, prompt_yn, prompt_password, +) + + +# ── Firmware file selection helper ──────────────────────────────────────────── +def _prompt_fw_file(label: str) -> str: + """ + Scan the current working directory for firmware files and let the user + pick one by number, or enter a custom path as the last option. + Files are sorted most-recently-modified first. + """ + cwd = os.getcwd() + FW_EXTS = {".bin", ".img", ".fw", ".hex", ".zip", ".tar", ".tgz", ".gz"} + + try: + candidates = sorted( + [f for f in os.listdir(cwd) + if not f.startswith(".") + and os.path.isfile(os.path.join(cwd, f)) + and os.path.splitext(f)[1].lower() in FW_EXTS], + key=lambda f: os.path.getmtime(os.path.join(cwd, f)), + reverse=True, + ) + except OSError: + candidates = [] + + print() + if candidates: + info(f"Firmware files found in {cwd}:") + for i, fname in enumerate(candidates, 1): + sz = os.path.getsize(os.path.join(cwd, fname)) + print(f" {_c(C.BOLD, str(i))} {fname} {_c(C.DIM, f'({sz // 1024} KB)')}") + custom_idx = len(candidates) + 1 + print(f" {_c(C.BOLD, str(custom_idx))} Enter a custom file path") + print() + + while True: + choice = prompt(f"Select {label} [1-{custom_idx}]") + if choice.isdigit(): + idx = int(choice) + if 1 <= idx <= len(candidates): + path = os.path.join(cwd, candidates[idx - 1]) + sz = os.path.getsize(path) + ok(f"Selected: {candidates[idx - 1]} ({sz // 1024} KB)") + return path + if idx == custom_idx: + break + warn(f"Please enter a number between 1 and {custom_idx}.") + else: + info(f"No firmware files found in {cwd}.") + + # Manual path entry + while True: + path = prompt(f"Path to {label}") + if os.path.isfile(path): + sz = os.path.getsize(path) + ok(f"File: {path} ({sz // 1024} KB)") + return path + warn(f"File not found: {path}") + + +# ── Per-IOM update helpers ──────────────────────────────────────────────────── +def _update_iom_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: + """Upload and apply IOM firmware for one IOM, then restart it.""" + sz = os.path.getsize(fw_path) + info(f"Uploading IOM firmware ({sz // 1024} KB) to {iom} at {ip}...") + ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) + if not ok_flag: + error(f"Upload failed: {data}") + return False + ok("Firmware file uploaded.") + + info(f"Triggering {iom} firmware update...") + ok_flag, data = _redfish_trigger_update( + password, ip, f"/redfish/v1/Managers/{iom}", + ) + if not ok_flag: + error(f"Update trigger failed: {data}") + return False + ok("Update triggered.") + + info("Monitoring update progress (this may take several minutes)...") + ok_flag, msg = _redfish_poll_tasks(password, ip) + if not ok_flag: + warn(f"Task monitoring ended: {msg}") + else: + ok(msg) + + info(f"Restarting {iom}...") + _redfish_restart_iom(password, ip, iom) # connection drop on restart is normal + ok(f"{iom} restart initiated. Waiting 30s for reboot...") + time.sleep(30) + return True + + +def _update_fabric_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: + """ + Upload and apply Fabric Card firmware for one IOM. + Per the service guide, the firmware file must be re-uploaded even if it was + already uploaded during the IOM firmware step. + After the update: restart fabric card, then restart IOM. + """ + sz = os.path.getsize(fw_path) + info(f"Uploading Fabric Card firmware ({sz // 1024} KB) to {iom} at {ip}...") + ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) + if not ok_flag: + error(f"Upload failed: {data}") + return False + ok("Firmware file uploaded.") + + info(f"Triggering {iom} Fabric Card firmware update...") + ok_flag, data = _redfish_trigger_update( + password, ip, f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", + ) + if not ok_flag: + error(f"Update trigger failed: {data}") + return False + ok("Update triggered.") + + info("Monitoring update progress...") + ok_flag, msg = _redfish_poll_tasks(password, ip) + if not ok_flag: + warn(f"Task monitoring ended: {msg}") + else: + ok(msg) + + info(f"Restarting {iom} Fabric Card...") + _redfish_reset_fabric(password, ip, iom) + ok("Fabric Card restart initiated. Waiting 15s...") + time.sleep(15) + + info(f"Restarting {iom} after Fabric Card update...") + _redfish_restart_iom(password, ip, iom) + ok(f"{iom} restart initiated. Waiting 30s for reboot...") + time.sleep(30) + return True + + +# ── Firmware Update Workflow ────────────────────────────────────────────────── +def firmware_update_workflow(): + """ + Standalone firmware update for IOM and Fabric Card firmware. + Connects to each IOM via its network IP (not serial loopback) — uploading + firmware over 115200-baud serial would be impractically slow. + """ + banner() + rule("IOM & Fabric Card Firmware Update") + info("This procedure connects to each IOM via its network IP address.") + info("Ensure this system has network access to the IOM management interface.") + print() + + password = prompt_password() + print() + + print(" Which IOM(s) would you like to update?") + print(f" {_c(C.BOLD, '1')} IOM1 only") + print(f" {_c(C.BOLD, '2')} IOM2 only") + print(f" {_c(C.BOLD, '3')} Both IOM1 and IOM2") + print() + + while True: + iom_choice = prompt("Select option [1-3]") + if iom_choice in ("1", "2", "3"): + break + warn("Please enter 1, 2, or 3.") + print() + + info("Enter the management IP address for each IOM to update.") + iom1_ip = prompt_ip(" IOM1 IP address") if iom_choice in ("1", "3") else "" + iom2_ip = prompt_ip(" IOM2 IP address") if iom_choice in ("2", "3") else "" + print() + + ioms = [] + if iom_choice in ("1", "3"): + ioms.append(("IOM1", iom1_ip)) + if iom_choice in ("2", "3"): + ioms.append(("IOM2", iom2_ip)) + + rule("Current Firmware Versions") + _show_fw_versions(password, ioms) + + print(" What would you like to update?") + print(f" {_c(C.BOLD, '1')} IOM Firmware only") + print(f" {_c(C.BOLD, '2')} Fabric Card Firmware only") + print(f" {_c(C.BOLD, '3')} Both IOM and Fabric Card Firmware") + print(f" {_c(C.BOLD, '4')} Cancel") + print() + + while True: + choice = prompt("Select option [1-4]") + if choice in ("1", "2", "3", "4"): + break + warn("Please enter 1, 2, 3, or 4.") + + if choice == "4": + info("Firmware update cancelled.") + return + + update_iom = choice in ("1", "3") + update_fabric = choice in ("2", "3") + iom_fw_path = "" + fabric_fw_path = "" + + if update_iom: + iom_fw_path = _prompt_fw_file("IOM firmware file") + + if update_fabric: + fabric_fw_path = _prompt_fw_file("Fabric Card firmware file") + + print() + warn("For HA systems: update the passive IOM first.") + if len(ioms) > 1: + warn("IOM1 will be updated first — adjust order if IOM2 is passive.") + print() + + if not prompt_yn("Proceed with firmware update?", default=True): + info("Firmware update cancelled.") + return + + for iom, ip in ioms: + rule(f"{iom} ({ip})") + if update_iom: + _update_iom_fw(password, ip, iom, iom_fw_path) + if update_fabric: + _update_fabric_fw(password, ip, iom, fabric_fw_path) + + rule("Post-Update Firmware Validation") + _show_fw_versions(password, ioms) + + print() + draw_box([ + f"{_c(C.YEL, 'IMPORTANT -- For HA (Dual-Controller) Systems:')}", + "", + "After updating this controller's IOMs:", + " 1. Log into TrueNAS and initiate a failover.", + " 2. Re-run this tool to update the other controller.", + ], colour=C.YEL) + print() diff --git a/workflow_serial.py b/workflow_serial.py new file mode 100644 index 0000000..45af02f --- /dev/null +++ b/workflow_serial.py @@ -0,0 +1,459 @@ +""" +workflow_serial.py — Serial-based ES24N IOM network configuration workflow. +Connects via USB serial cable, wakes the IOM console, and configures +network settings through the Redfish API over the serial loopback (127.0.0.1). +""" + +import glob +import os +import subprocess +import time +from typing import Optional + +from models import IOMConfig, ShelfConfig +from redfish import _redfish_request +from serial_port import SerialPort +from ui import ( + _c, C, + banner, rule, draw_table, draw_box, + info, ok, warn, error, + prompt, prompt_ip, prompt_yn, prompt_password, +) + + +# ── Step 1: Detect serial device ────────────────────────────────────────────── +def detect_serial_device() -> Optional[str]: + rule("Step 1 of 5 -- Serial Cable & Device Detection") + + print(" Connect the serial cable from the ES24N IOM1 port") + print(" to the active F-Series controller USB port.") + print() + prompt("Press Enter when the cable is connected") + + for attempt in range(1, 4): + info(f"Scanning for USB serial devices (attempt {attempt}/3)...") + time.sleep(1) + + # FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM* + patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"] + ports = sorted({p for pat in patterns for p in glob.glob(pat)}) + + if ports: + break + + if attempt < 3: + warn("No device found yet — retrying in 2 seconds...") + time.sleep(2) + + if not ports: + error("No USB serial device detected after 3 attempts.") + print() + print(" Troubleshooting:") + print(" - Ensure the serial cable is fully seated at both ends.") + print(" - Try a different USB port on the controller.") + print(" - Confirm the ES24N is powered on.") + return None + + if len(ports) == 1: + ok(f"Device found: {_c(C.BOLD, ports[0])}") + _fix_permissions(ports[0]) + return ports[0] + + # Multiple devices — let the user choose + print() + draw_table( + ["#", "Device"], + [[str(i), p] for i, p in enumerate(ports, 1)], + [4, 24], + ) + print() + + while True: + val = prompt(f"Select device number [1-{len(ports)}]") + if val.isdigit() and 1 <= int(val) <= len(ports): + selected = ports[int(val) - 1] + ok(f"Selected: {_c(C.BOLD, selected)}") + _fix_permissions(selected) + return selected + warn(f"Please enter a number between 1 and {len(ports)}.") + + +def _fix_permissions(device: str): + try: + result = subprocess.run( + ["sudo", "chown", ":wheel", device], + capture_output=True, timeout=5, + ) + if result.returncode == 0: + ok(f"Permissions updated on {device}") + return + except Exception: + pass + try: + os.chmod(device, 0o666) + ok(f"Permissions updated on {device}") + except PermissionError: + warn("Could not update device permissions automatically.") + warn("If the connection fails, re-run this script with sudo.") + + +# ── Step 2: Open serial connection & wake IOM console ───────────────────────── +def open_serial_connection(device: str) -> Optional[SerialPort]: + rule("Step 2 of 5 -- Opening Serial Connection") + + info(f"Opening {device} at 115200 baud (8N1)...") + ser = SerialPort(device, baudrate=115200, timeout=5.0) + + try: + ser.open() + except OSError as e: + error(str(e)) + return None + + ok(f"Port opened: {device}") + info("Sending wake signal to IOM console...") + + ser.send_line("", delay=0.5) + ser.send_line("", delay=0.5) + response = ser.read_until_quiet(quiet_period=0.5) + + print() + if response.strip(): + print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}") + for line in response.strip().splitlines(): + print(f" {_c(C.DIM, '|')} {line}") + print(f" {_c(C.DIM, '+' + '-' * 56)}") + print() + + low = response.lower() + if any(kw in low for kw in ("login", "$", "#", "password")): + ok("IOM console is responsive.") + else: + warn("Unexpected response — the IOM may still be booting.") + warn("You can continue; the Redfish API operates independently.") + else: + warn("No response received from IOM console.") + warn("The Redfish API may still be reachable. Continuing...") + + print() + return ser + + +# ── Step 3: Fetch & display current IOM network settings ───────────────────── +def fetch_current_config(cfg: ShelfConfig) -> bool: + """ + Query Redfish for the current network config of both IOMs. + Populates cfg.iom1 / cfg.iom2 with live data. + Returns True if at least one IOM responded. + """ + rule("Step 3 of 5 -- Current IOM Network Settings") + info("Querying Redfish API for current network configuration...") + print() + + any_ok = False + rows = [] + errors = [] + + for iom in ("IOM1", "IOM2"): + path = f"/redfish/v1/Managers/{iom}/EthernetInterfaces/1" + ok_flag, data = _redfish_request(cfg.password, "GET", path) + + if ok_flag and isinstance(data, dict): + any_ok = True + + # Determine mode + dhcp_enabled = ( + data.get("DHCPv4", {}).get("DHCPEnabled", False) or + data.get("DHCPv6", {}).get("DHCPEnabled", False) + ) + + # Pull address info — prefer StaticAddresses, fall back to IPv4Addresses + addrs = data.get("IPv4StaticAddresses") or data.get("IPv4Addresses", []) + if addrs: + addr_rec = addrs[0] + ip = addr_rec.get("Address", "--") + gateway = addr_rec.get("Gateway", "--") + netmask = addr_rec.get("SubnetMask", "--") + else: + ip = gateway = netmask = "--" + + origin = data.get("IPv4Addresses", [{}])[0].get("AddressOrigin", "Unknown") \ + if data.get("IPv4Addresses") else ("DHCP" if dhcp_enabled else "Static") + + iom_cfg = IOMConfig( + iom = iom, + dhcp = dhcp_enabled, + ip = ip if ip != "--" else "", + gateway = gateway if gateway != "--" else "", + netmask = netmask if netmask != "--" else "", + ) + if iom == "IOM1": + cfg.iom1 = iom_cfg + else: + cfg.iom2 = iom_cfg + + mode_str = f"{_c(C.CYN, 'DHCP')}" if dhcp_enabled else f"{_c(C.GRN, 'Static')}" + rows.append([iom, mode_str, origin, ip, gateway, netmask]) + else: + rows.append([iom, _c(C.RED, "No response"), "--", "--", "--", "--"]) + errors.append((iom, str(data))) + + draw_table( + ["IOM", "Mode", "Origin", "IP Address", "Gateway", "Subnet Mask"], + rows, + [5, 10, 8, 16, 16, 16], + ) + print() + + if errors: + for iom, err in errors: + error(f"{iom} query failed: {err}") + print() + + if not any_ok: + error("Neither IOM responded to the Redfish query.") + error("Check that the serial cable is connected and the IOM is booted.") + + return any_ok + + +# ── Step 4: Prompt user — change config or exit ─────────────────────────────── +def collect_network_config(cfg: ShelfConfig) -> bool: + """ + Show current settings, ask user what to do. + Returns True to proceed with applying changes, False to skip. + """ + rule("Step 4 of 5 -- Change Configuration?") + + print(f" {_c(C.BOLD, '1')} Change network configuration") + print(f" {_c(C.BOLD, '2')} Leave settings as-is and disconnect") + print() + + while True: + choice = prompt("Select option [1/2]") + if choice in ("1", "2"): + break + warn("Please enter 1 or 2.") + + if choice == "2": + info("No changes requested.") + return False + + # ── User wants to change settings ───────────────────────────────────────── + print() + print(" How should the IOMs be configured?") + print(f" {_c(C.BOLD, '1')} Static IP addresses") + print(f" {_c(C.BOLD, '2')} DHCP") + print() + + while True: + mode = prompt("Select mode [1/2]") + if mode in ("1", "2"): + break + warn("Please enter 1 or 2.") + + use_dhcp = (mode == "2") + print() + + if use_dhcp: + cfg.iom1 = IOMConfig("IOM1", dhcp=True) + cfg.iom2 = IOMConfig("IOM2", dhcp=True) + ok("Both IOMs will be set to DHCP.") + return True + + # Static — IOM1 + info(f"Static network details for {_c(C.BOLD, 'IOM1')}:") + iom1_ip = prompt_ip(" IOM1 IP address ") + iom1_gw = prompt_ip(" IOM1 Gateway ") + iom1_nm = prompt_ip(" IOM1 Subnet Mask") + cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm) + + # Static — IOM2 + print() + info(f"Static network details for {_c(C.BOLD, 'IOM2')}:") + iom2_ip = prompt_ip(" IOM2 IP address ") + + same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True) + if same: + iom2_gw, iom2_nm = iom1_gw, iom1_nm + else: + iom2_gw = prompt_ip(" IOM2 Gateway ") + iom2_nm = prompt_ip(" IOM2 Subnet Mask") + + cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm) + return True + + +# ── Step 5a: Apply configuration via Redfish ────────────────────────────────── +def apply_configuration(cfg: ShelfConfig) -> bool: + rule("Step 5 of 5 -- Applying Configuration via Redfish API") + + info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...") + print() + + results = [] + all_ok = True + for iom_cfg in [cfg.iom1, cfg.iom2]: + success, detail = _apply_iom(cfg.password, iom_cfg) + status = _c(C.GRN, "OK") if success else _c(C.RED, "FAIL") + results.append([iom_cfg.iom, status, detail]) + if not success: + all_ok = False + + draw_table(["IOM", "Result", "Detail"], results, [6, 8, 50]) + print() + return all_ok + + +def _apply_iom(password: str, iom_cfg: IOMConfig) -> tuple: + """ + Apply network config to a single IOM. + + DHCP: single PATCH enabling DHCPv4. + + Static: two sequential PATCHes to work around a firmware bug in the + current ES24N release that prevents disabling DHCP and setting a static + address in the same request. + Pass 1 -- set the static IP/gateway/netmask (DHCP still on) + Pass 2 -- disable DHCP (address is already committed) + """ + path = f"/redfish/v1/Managers/{iom_cfg.iom}/EthernetInterfaces/1" + + if iom_cfg.dhcp: + ok_flag, data = _redfish_request( + password, "PATCH", path, + {"DHCPv4": {"DHCPEnabled": True}}, + ) + if ok_flag: + return True, "Configured: DHCP" + return False, str(data)[:80] + + # Static -- Pass 1: set address while DHCP is still enabled + info(f" {iom_cfg.iom} pass 1/2 -- setting static address {iom_cfg.ip}...") + ok_flag, data = _redfish_request( + password, "PATCH", path, + { + "IPv4StaticAddresses": [{ + "Address": iom_cfg.ip, + "Gateway": iom_cfg.gateway, + "SubnetMask": iom_cfg.netmask, + }] + }, + ) + if not ok_flag: + return False, f"Pass 1 failed: {str(data)[:70]}" + + # Brief pause to allow the IOM to commit the address before the next call + time.sleep(1) + + # Static -- Pass 2: disable DHCP now that the static address is committed + info(f" {iom_cfg.iom} pass 2/2 -- disabling DHCP...") + ok_flag, data = _redfish_request( + password, "PATCH", path, + {"DHCPv4": {"DHCPEnabled": False}}, + ) + if not ok_flag: + return False, f"Pass 2 failed: {str(data)[:70]}" + + return True, f"Configured: Static {iom_cfg.ip}" + + +# ── Step 5b: Print applied-settings summary ─────────────────────────────────── +def print_summary(cfg: ShelfConfig, changed: bool): + rule("Summary") + + def val(iom: IOMConfig, field: str) -> str: + dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"} + stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask} + return (dhcp_map if iom.dhcp else stat_map).get(field, "") + + draw_table( + ["Setting", "IOM1", "IOM2"], + [ + ["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")], + ["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")], + ["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")], + ["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")], + ["Serial Port", cfg.device, cfg.device], + ["Changes", "Yes" if changed else "None", ""], + ], + [12, 22, 22], + ) + + if changed: + print() + draw_box([ + f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}", + "", + "Remove the serial cable ONLY after verifying each", + "expander appears in TrueNAS with matching drives.", + "", + "TrueNAS > System Settings > Enclosure >", + "NVMe-oF Expansion Shelves", + ], colour=C.YEL) + print() + + +# ── Disconnect ──────────────────────────────────────────────────────────────── +def close_serial_connection(ser: SerialPort, device: str): + if ser and ser.is_open: + ser.close() + ok(f"Serial port {device} closed.") + + print() + prompt("Disconnect the serial cable, then press Enter to continue") + ok("Serial cable disconnected. Shelf complete.") + + +# ── Full shelf configuration cycle ──────────────────────────────────────────── +def configure_shelf() -> bool: + """Run one complete shelf cycle. Returns True if user wants another shelf.""" + banner() + cfg = ShelfConfig() + + # 1 — Detect device + device = detect_serial_device() + if not device: + error("Could not detect a serial device. Returning to main menu.") + time.sleep(2) + return True + cfg.device = device + + # 2 — Open serial port & wake IOM console + ser = open_serial_connection(device) + if not ser: + error("Could not open serial port. Returning to main menu.") + time.sleep(2) + return True + + # Password needed before any Redfish calls + print() + cfg.password = prompt_password() + + # 3 — Fetch & display current settings + fetch_current_config(cfg) + + # 4 — Ask user: change or leave alone? + apply_changes = collect_network_config(cfg) + + # 5 — Apply if requested + changed = False + if apply_changes: + print() + rule("Ready to Apply") + info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1") + print() + if prompt_yn("Apply configuration now?", default=True): + apply_configuration(cfg) + changed = True + else: + warn("Configuration skipped — no changes were made.") + + # Summary + print_summary(cfg, changed) + + # Disconnect + close_serial_connection(ser, device) + + print() + return prompt_yn("Configure another ES24N shelf?", default=False)