#!/usr/bin/env python3 """ ES24N IOM Network Configuration Tool TrueNAS ES24N Expansion Shelf — Serial Configuration Utility Based on ES24N Product Service Guide v.26011 Zero external dependencies — Python 3 standard library only. Compatible with TrueNAS (FreeBSD) and Linux. """ import fcntl import glob import ipaddress import json import os import select import ssl import subprocess import sys import termios import time import urllib.error import urllib.request from base64 import b64encode from dataclasses import dataclass, field from typing import Optional # ── ANSI colour helpers ─────────────────────────────────────────────────────── class C: RED = "\033[0;31m" GRN = "\033[0;32m" YEL = "\033[1;33m" CYN = "\033[0;36m" WHT = "\033[1;37m" DIM = "\033[2m" BOLD = "\033[1m" RESET = "\033[0m" CLEAR = "\033[2J\033[H" def _c(colour: str, text: str) -> str: return f"{colour}{text}{C.RESET}" def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}") def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}") def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}") def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}") def banner(): print(C.CLEAR, end="") w = 60 print(_c(C.CYN, " +" + "-" * w + "+")) print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD, " TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |")) print(_c(C.CYN, " |") + _c(C.DIM, " Serial Config & Firmware Updates (stdlib only) ") + _c(C.CYN, " |")) print(_c(C.CYN, " +" + "-" * w + "+")) print() def rule(title: str = ""): width = 60 if title: pad = max(0, width - len(title) - 2) left = pad // 2 right = pad - left line = f"{'-' * left} {title} {'-' * right}" else: line = "-" * width print(f"\n {_c(C.YEL, line)}\n") def draw_table(headers: list, rows: list, col_widths: list): sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+" def fmt_row(cells): return " | " + " | ".join( str(c).ljust(w) for c, w in zip(cells, col_widths) ) + " |" print(_c(C.DIM, sep)) print(_c(C.BOLD, fmt_row(headers))) print(_c(C.DIM, sep)) for row in rows: print(fmt_row(row)) print(_c(C.DIM, sep)) def draw_box(lines: list, colour: str = C.CYN): width = max(len(l) for l in lines) + 4 print(f" {_c(colour, '+' + '-' * width + '+')}") for line in lines: pad = width - len(line) - 2 print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}") print(f" {_c(colour, '+' + '-' * width + '+')}") # ── Input helpers ───────────────────────────────────────────────────────────── def prompt(label: str, default: str = "") -> str: display = f" {_c(C.CYN, label)}" if default: display += f" {_c(C.DIM, f'[{default}]')}" display += ": " sys.stdout.write(display) sys.stdout.flush() val = sys.stdin.readline().strip() return val if val else default def prompt_ip(label: str) -> str: while True: val = prompt(label) try: ipaddress.IPv4Address(val) return val except ValueError: warn(f"'{val}' is not a valid IPv4 address — please try again.") def prompt_yn(label: str, default: bool = True) -> bool: hint = "Y/n" if default else "y/N" val = prompt(f"{label} [{hint}]").strip().lower() if not val: return default return val in ("y", "yes") def prompt_password() -> str: while True: val = prompt( "Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)", ) if val: return val warn("Password cannot be empty.") # ── Data classes ────────────────────────────────────────────────────────────── @dataclass class IOMConfig: iom: str dhcp: bool = True ip: str = "" gateway: str = "" netmask: str = "" @dataclass class ShelfConfig: device: str = "" password: str = "" iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1")) iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2")) # ── Serial port (stdlib: termios / fcntl / select) ──────────────────────────── class SerialPort: """ Minimal 8N1 serial port using only the Python standard library. Replaces pyserial for TrueNAS environments where pip is unavailable. """ BAUD_MAP = { 9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400, 57600: termios.B57600, 115200: termios.B115200, } def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): self.port = port self.baudrate = baudrate self.timeout = timeout self._fd: Optional[int] = None self._saved_attrs = None # ── Open / close ────────────────────────────────────────────────────────── def open(self): try: self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as e: raise OSError(f"Cannot open {self.port}: {e}") from e # Switch back to blocking mode now that O_NOCTTY is set flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) self._saved_attrs = termios.tcgetattr(self._fd) # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc attrs = list(termios.tcgetattr(self._fd)) baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) attrs[0] = termios.IGNBRK # iflag attrs[1] = 0 # oflag attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag attrs[3] = 0 # lflag (raw) attrs[4] = baud # ispeed attrs[5] = baud # ospeed attrs[6][termios.VMIN] = 0 attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) termios.tcsetattr(self._fd, termios.TCSANOW, attrs) termios.tcflush(self._fd, termios.TCIOFLUSH) def close(self): if self._fd is not None: try: if self._saved_attrs: termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) os.close(self._fd) except OSError: pass finally: self._fd = None @property def is_open(self) -> bool: return self._fd is not None # ── Read / write ────────────────────────────────────────────────────────── def write(self, data: bytes): if self._fd is None: raise OSError("Port is not open") os.write(self._fd, data) def read_chunk(self, size: int = 4096) -> bytes: if self._fd is None: raise OSError("Port is not open") try: return os.read(self._fd, size) except OSError: return b"" def read_until_quiet(self, quiet_period: float = 0.5) -> str: """ Read until no new bytes arrive for `quiet_period` seconds, or until `self.timeout` total seconds have elapsed. """ output = b"" deadline = time.monotonic() + self.timeout last_rx = time.monotonic() while True: now = time.monotonic() if now >= deadline: break if output and (now - last_rx) >= quiet_period: break wait = min(deadline - now, quiet_period) ready, _, _ = select.select([self._fd], [], [], wait) if ready: chunk = self.read_chunk() if chunk: output += chunk last_rx = time.monotonic() return output.decode("utf-8", errors="replace") def send_line(self, cmd: str = "", delay: float = 0.3): self.write((cmd + "\r\n").encode("utf-8")) time.sleep(delay) # ── Context manager ─────────────────────────────────────────────────────── def __enter__(self): self.open() return self def __exit__(self, *_): self.close() # ── Step 1: Detect serial device ────────────────────────────────────────────── def detect_serial_device() -> Optional[str]: rule("Step 1 of 5 -- Serial Cable & Device Detection") print(" Connect the serial cable from the ES24N IOM1 port") print(" to the active F-Series controller USB port.") print() prompt("Press Enter when the cable is connected") for attempt in range(1, 4): info(f"Scanning for USB serial devices (attempt {attempt}/3)...") time.sleep(1) # FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM* patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"] ports = sorted({p for pat in patterns for p in glob.glob(pat)}) if ports: break if attempt < 3: warn("No device found yet — retrying in 2 seconds...") time.sleep(2) if not ports: error("No USB serial device detected after 3 attempts.") print() print(" Troubleshooting:") print(" - Ensure the serial cable is fully seated at both ends.") print(" - Try a different USB port on the controller.") print(" - Confirm the ES24N is powered on.") return None if len(ports) == 1: ok(f"Device found: {_c(C.BOLD, ports[0])}") _fix_permissions(ports[0]) return ports[0] # Multiple devices — let the user choose print() draw_table( ["#", "Device"], [[str(i), p] for i, p in enumerate(ports, 1)], [4, 24], ) print() while True: val = prompt(f"Select device number [1-{len(ports)}]") if val.isdigit() and 1 <= int(val) <= len(ports): selected = ports[int(val) - 1] ok(f"Selected: {_c(C.BOLD, selected)}") _fix_permissions(selected) return selected warn(f"Please enter a number between 1 and {len(ports)}.") def _fix_permissions(device: str): try: result = subprocess.run( ["sudo", "chown", ":wheel", device], capture_output=True, timeout=5, ) if result.returncode == 0: ok(f"Permissions updated on {device}") return except Exception: pass try: os.chmod(device, 0o666) ok(f"Permissions updated on {device}") except PermissionError: warn("Could not update device permissions automatically.") warn("If the connection fails, re-run this script with sudo.") # ── Step 2: Open serial connection & wake IOM console ───────────────────────── def open_serial_connection(device: str) -> Optional[SerialPort]: rule("Step 2 of 5 -- Opening Serial Connection") info(f"Opening {device} at 115200 baud (8N1)...") ser = SerialPort(device, baudrate=115200, timeout=5.0) try: ser.open() except OSError as e: error(str(e)) return None ok(f"Port opened: {device}") info("Sending wake signal to IOM console...") ser.send_line("", delay=0.5) ser.send_line("", delay=0.5) response = ser.read_until_quiet(quiet_period=0.5) print() if response.strip(): print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}") for line in response.strip().splitlines(): print(f" {_c(C.DIM, '|')} {line}") print(f" {_c(C.DIM, '+' + '-' * 56)}") print() low = response.lower() if any(kw in low for kw in ("login", "$", "#", "password")): ok("IOM console is responsive.") else: warn("Unexpected response — the IOM may still be booting.") warn("You can continue; the Redfish API operates independently.") else: warn("No response received from IOM console.") warn("The Redfish API may still be reachable. Continuing...") print() return ser # ── Redfish helpers (GET and PATCH) ────────────────────────────────────────── def _redfish_request(password: str, method: str, path: str, payload: Optional[dict] = None, host: str = "127.0.0.1") -> tuple: """ Issue a Redfish HTTP request. host defaults to the serial loopback (127.0.0.1) but can be set to an IOM's network IP for firmware update operations. Returns (success: bool, data: dict|str). """ url = f"https://{host}{path}" username = "root" if host == "127.0.0.1" else "Admin" credentials = b64encode(f"{username}:{password}".encode()).decode() ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE body = json.dumps(payload).encode("utf-8") if payload else None headers = {"Authorization": f"Basic {credentials}"} if body: headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=body, method=method, headers=headers) try: with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: raw = resp.read().decode("utf-8", errors="replace") try: return True, json.loads(raw) if raw.strip() else {} except json.JSONDecodeError: return True, {} except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: msg = json.loads(raw).get("error", {}).get("message", raw) except json.JSONDecodeError: msg = raw return False, f"HTTP {e.code}: {msg[:120]}" except OSError as e: return False, f"Connection error: {e}" # ── Redfish helpers (firmware upload & update) ──────────────────────────────── def _redfish_upload_firmware(password: str, host: str, fw_path: str) -> tuple: """ Upload a firmware file to /redfish/v1/UpdateService using multipart/form-data. Equivalent to: curl -k -u Admin: https:///redfish/v1/UpdateService -X POST -F "software=@" """ try: with open(fw_path, "rb") as f: file_data = f.read() except OSError as e: return False, f"Cannot read file: {e}" filename = os.path.basename(fw_path) boundary = f"FormBoundary{int(time.time() * 1000)}" body = ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="software"; filename="{filename}"\r\n' "Content-Type: application/octet-stream\r\n" "\r\n" ).encode() + file_data + f"\r\n--{boundary}--\r\n".encode() url = f"https://{host}/redfish/v1/UpdateService" credentials = b64encode(f"Admin:{password}".encode()).decode() # always network ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request( url, data=body, method="POST", headers={ "Authorization": f"Basic {credentials}", "Content-Type": f"multipart/form-data; boundary={boundary}", }, ) try: with urllib.request.urlopen(req, context=ctx, timeout=120) as resp: raw = resp.read().decode("utf-8", errors="replace") try: return True, json.loads(raw) if raw.strip() else {} except json.JSONDecodeError: return True, {} except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: msg = json.loads(raw).get("error", {}).get("message", raw) except json.JSONDecodeError: msg = raw return False, f"HTTP {e.code}: {msg[:120]}" except OSError as e: return False, f"Connection error: {e}" def _redfish_trigger_update(password: str, host: str, target: str) -> tuple: """ Trigger a Redfish SimpleUpdate for the given target resource path. target: e.g. "/redfish/v1/Managers/IOM1" or "/redfish/v1/Chassis/IOM1/NetworkAdapters/1" """ return _redfish_request( password, "POST", "/redfish/v1/UpdateService/Actions/SimpleUpdate", payload={ "ImageURI": "/redfish/v1/UpdateService/software", "Targets": [target], }, host=host, ) def _redfish_poll_tasks(password: str, host: str, timeout: int = 600) -> tuple: """ Poll /redfish/v1/TaskService/Tasks/ until all tasks reach a terminal state or timeout is exceeded. Returns (success: bool, message: str). """ TERMINAL = {"Completed", "Killed", "Exception"} deadline = time.monotonic() + timeout elapsed = 0 while time.monotonic() < deadline: ok_flag, data = _redfish_request( password, "GET", "/redfish/v1/TaskService/Tasks/", host=host, ) if not ok_flag: return False, f"Task service error: {data}" members = data.get("Members", []) if not members: return True, "No pending tasks." running = [] for member in members: state = member.get("TaskState") if state is None: # Resolve individual task link task_path = member.get("@odata.id", "") if task_path: t_ok, t_data = _redfish_request( password, "GET", task_path, host=host, ) state = (t_data.get("TaskState") if t_ok and isinstance(t_data, dict) else "Running") else: state = "Running" if state not in TERMINAL: running.append(state) if not running: return True, "All tasks completed." info(f" Tasks running ({', '.join(running)})... [{elapsed}s elapsed]") time.sleep(10) elapsed += 10 return False, f"Timeout after {timeout}s waiting for tasks." def _redfish_restart_iom(password: str, host: str, iom: str) -> tuple: return _redfish_request( password, "POST", f"/redfish/v1/Managers/{iom}/Actions/Manager.Reset", payload={"ResetType": "GracefulRestart"}, host=host, ) def _redfish_reset_fabric(password: str, host: str, iom: str) -> tuple: return _redfish_request( password, "POST", f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1/Actions/NetworkAdapter.Reset", payload={"ResetType": "GracefulRestart"}, host=host, ) def _get_iom_fw_version(password: str, host: str, iom: str) -> str: ok_flag, data = _redfish_request( password, "GET", f"/redfish/v1/Managers/{iom}", host=host, ) if ok_flag and isinstance(data, dict): return data.get("FirmwareVersion", "Unknown") return _c(C.RED, "Unreachable") def _get_fabric_fw_version(password: str, host: str, iom: str) -> str: ok_flag, data = _redfish_request( password, "GET", f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", host=host, ) if ok_flag and isinstance(data, dict): version = (data.get("Oem", {}) .get("Version", {}) .get("ActiveFirmwareVersion")) return version or "Unknown" return _c(C.RED, "Unreachable") def _show_fw_versions(password: str, ioms: list): info("Querying firmware versions...") rows = [] for iom, ip in ioms: iom_ver = _get_iom_fw_version(password, ip, iom) fabric_ver = _get_fabric_fw_version(password, ip, iom) rows.append([iom, ip, iom_ver, fabric_ver]) print() draw_table( ["IOM", "IP Address", "IOM Firmware", "Fabric Firmware"], rows, [5, 16, 32, 20], ) print() def _update_iom_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: """Upload and apply IOM firmware for one IOM, then restart it.""" sz = os.path.getsize(fw_path) info(f"Uploading IOM firmware ({sz // 1024} KB) to {iom} at {ip}...") ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) if not ok_flag: error(f"Upload failed: {data}") return False ok("Firmware file uploaded.") info(f"Triggering {iom} firmware update...") ok_flag, data = _redfish_trigger_update( password, ip, f"/redfish/v1/Managers/{iom}", ) if not ok_flag: error(f"Update trigger failed: {data}") return False ok("Update triggered.") info("Monitoring update progress (this may take several minutes)...") ok_flag, msg = _redfish_poll_tasks(password, ip) if not ok_flag: warn(f"Task monitoring ended: {msg}") else: ok(msg) info(f"Restarting {iom}...") _redfish_restart_iom(password, ip, iom) # connection drop on restart is normal ok(f"{iom} restart initiated. Waiting 30s for reboot...") time.sleep(30) return True def _update_fabric_fw(password: str, ip: str, iom: str, fw_path: str) -> bool: """ Upload and apply Fabric Card firmware for one IOM. Per the service guide, the firmware file must be re-uploaded even if it was already uploaded during the IOM firmware step. After the update: restart fabric card, then restart IOM. """ sz = os.path.getsize(fw_path) info(f"Uploading Fabric Card firmware ({sz // 1024} KB) to {iom} at {ip}...") ok_flag, data = _redfish_upload_firmware(password, ip, fw_path) if not ok_flag: error(f"Upload failed: {data}") return False ok("Firmware file uploaded.") info(f"Triggering {iom} Fabric Card firmware update...") ok_flag, data = _redfish_trigger_update( password, ip, f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1", ) if not ok_flag: error(f"Update trigger failed: {data}") return False ok("Update triggered.") info("Monitoring update progress...") ok_flag, msg = _redfish_poll_tasks(password, ip) if not ok_flag: warn(f"Task monitoring ended: {msg}") else: ok(msg) info(f"Restarting {iom} Fabric Card...") _redfish_reset_fabric(password, ip, iom) ok("Fabric Card restart initiated. Waiting 15s...") time.sleep(15) info(f"Restarting {iom} after Fabric Card update...") _redfish_restart_iom(password, ip, iom) ok(f"{iom} restart initiated. Waiting 30s for reboot...") time.sleep(30) return True # ── Firmware Update Workflow ────────────────────────────────────────────────── def firmware_update_workflow(): """ Standalone firmware update for IOM and Fabric Card firmware. Connects to each IOM via its network IP (not serial loopback) — uploading firmware over 115200-baud serial would be impractically slow. """ banner() rule("IOM & Fabric Card Firmware Update") info("This procedure connects to each IOM via its network IP address.") info("Ensure this system has network access to the IOM management interface.") print() password = prompt_password() print() print(" Which IOM(s) would you like to update?") print(f" {_c(C.BOLD, '1')} IOM1 only") print(f" {_c(C.BOLD, '2')} IOM2 only") print(f" {_c(C.BOLD, '3')} Both IOM1 and IOM2") print() while True: iom_choice = prompt("Select option [1-3]") if iom_choice in ("1", "2", "3"): break warn("Please enter 1, 2, or 3.") print() info("Enter the management IP address for each IOM to update.") iom1_ip = prompt_ip(" IOM1 IP address") if iom_choice in ("1", "3") else "" iom2_ip = prompt_ip(" IOM2 IP address") if iom_choice in ("2", "3") else "" print() ioms = [] if iom_choice in ("1", "3"): ioms.append(("IOM1", iom1_ip)) if iom_choice in ("2", "3"): ioms.append(("IOM2", iom2_ip)) rule("Current Firmware Versions") _show_fw_versions(password, ioms) print(" What would you like to update?") print(f" {_c(C.BOLD, '1')} IOM Firmware only") print(f" {_c(C.BOLD, '2')} Fabric Card Firmware only") print(f" {_c(C.BOLD, '3')} Both IOM and Fabric Card Firmware") print(f" {_c(C.BOLD, '4')} Cancel") print() while True: choice = prompt("Select option [1-4]") if choice in ("1", "2", "3", "4"): break warn("Please enter 1, 2, 3, or 4.") if choice == "4": info("Firmware update cancelled.") return update_iom = choice in ("1", "3") update_fabric = choice in ("2", "3") iom_fw_path = "" fabric_fw_path = "" if update_iom: print() while True: iom_fw_path = prompt("Path to IOM firmware file") if os.path.isfile(iom_fw_path): sz = os.path.getsize(iom_fw_path) ok(f"File: {iom_fw_path} ({sz // 1024} KB)") break warn(f"File not found: {iom_fw_path}") if update_fabric: print() while True: fabric_fw_path = prompt("Path to Fabric Card firmware file") if os.path.isfile(fabric_fw_path): sz = os.path.getsize(fabric_fw_path) ok(f"File: {fabric_fw_path} ({sz // 1024} KB)") break warn(f"File not found: {fabric_fw_path}") print() warn("For HA systems: update the passive IOM first.") if len(ioms) > 1: warn("IOM1 will be updated first — adjust order if IOM2 is passive.") print() if not prompt_yn("Proceed with firmware update?", default=True): info("Firmware update cancelled.") return for iom, ip in ioms: rule(f"{iom} ({ip})") if update_iom: _update_iom_fw(password, ip, iom, iom_fw_path) if update_fabric: _update_fabric_fw(password, ip, iom, fabric_fw_path) rule("Post-Update Firmware Validation") _show_fw_versions(password, ioms) print() draw_box([ f"{_c(C.YEL, 'IMPORTANT -- For HA (Dual-Controller) Systems:')}", "", "After updating this controller's IOMs:", " 1. Log into TrueNAS and initiate a failover.", " 2. Re-run this tool to update the other controller.", ], colour=C.YEL) print() # ── Step 3: Fetch & display current IOM network settings ───────────────────── def fetch_current_config(cfg: ShelfConfig) -> bool: """ Query Redfish for the current network config of both IOMs. Populates cfg.iom1 / cfg.iom2 with live data. Returns True if at least one IOM responded. """ rule("Step 3 of 5 -- Current IOM Network Settings") info("Querying Redfish API for current network configuration...") print() any_ok = False rows = [] errors = [] for iom in ("IOM1", "IOM2"): path = f"/redfish/v1/Managers/{iom}/EthernetInterfaces/1" ok_flag, data = _redfish_request(cfg.password, "GET", path) if ok_flag and isinstance(data, dict): any_ok = True # Determine mode dhcp_enabled = ( data.get("DHCPv4", {}).get("DHCPEnabled", False) or data.get("DHCPv6", {}).get("DHCPEnabled", False) ) # Pull address info — prefer StaticAddresses, fall back to IPv4Addresses addrs = data.get("IPv4StaticAddresses") or data.get("IPv4Addresses", []) if addrs: addr_rec = addrs[0] ip = addr_rec.get("Address", "--") gateway = addr_rec.get("Gateway", "--") netmask = addr_rec.get("SubnetMask", "--") else: ip = gateway = netmask = "--" origin = data.get("IPv4Addresses", [{}])[0].get("AddressOrigin", "Unknown") \ if data.get("IPv4Addresses") else ("DHCP" if dhcp_enabled else "Static") iom_cfg = IOMConfig( iom = iom, dhcp = dhcp_enabled, ip = ip if ip != "--" else "", gateway = gateway if gateway != "--" else "", netmask = netmask if netmask != "--" else "", ) if iom == "IOM1": cfg.iom1 = iom_cfg else: cfg.iom2 = iom_cfg mode_str = f"{_c(C.CYN, 'DHCP')}" if dhcp_enabled else f"{_c(C.GRN, 'Static')}" rows.append([iom, mode_str, origin, ip, gateway, netmask]) else: rows.append([iom, _c(C.RED, "No response"), "--", "--", "--", "--"]) errors.append((iom, str(data))) draw_table( ["IOM", "Mode", "Origin", "IP Address", "Gateway", "Subnet Mask"], rows, [5, 10, 8, 16, 16, 16], ) print() if errors: for iom, err in errors: error(f"{iom} query failed: {err}") print() if not any_ok: error("Neither IOM responded to the Redfish query.") error("Check that the serial cable is connected and the IOM is booted.") return any_ok # ── Step 4: Prompt user — change config or exit ─────────────────────────────── def collect_network_config(cfg: ShelfConfig) -> bool: """ Show current settings, ask user what to do. Returns True to proceed with applying changes, False to skip. """ rule("Step 4 of 5 -- Change Configuration?") print(f" {_c(C.BOLD, '1')} Change network configuration") print(f" {_c(C.BOLD, '2')} Leave settings as-is and disconnect") print() while True: choice = prompt("Select option [1/2]") if choice in ("1", "2"): break warn("Please enter 1 or 2.") if choice == "2": info("No changes requested.") return False # ── User wants to change settings ───────────────────────────────────────── print() print(" How should the IOMs be configured?") print(f" {_c(C.BOLD, '1')} Static IP addresses") print(f" {_c(C.BOLD, '2')} DHCP") print() while True: mode = prompt("Select mode [1/2]") if mode in ("1", "2"): break warn("Please enter 1 or 2.") use_dhcp = (mode == "2") print() if use_dhcp: cfg.iom1 = IOMConfig("IOM1", dhcp=True) cfg.iom2 = IOMConfig("IOM2", dhcp=True) ok("Both IOMs will be set to DHCP.") return True # Static — IOM1 info(f"Static network details for {_c(C.BOLD, 'IOM1')}:") iom1_ip = prompt_ip(" IOM1 IP address ") iom1_gw = prompt_ip(" IOM1 Gateway ") iom1_nm = prompt_ip(" IOM1 Subnet Mask") cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm) # Static — IOM2 print() info(f"Static network details for {_c(C.BOLD, 'IOM2')}:") iom2_ip = prompt_ip(" IOM2 IP address ") same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True) if same: iom2_gw, iom2_nm = iom1_gw, iom1_nm else: iom2_gw = prompt_ip(" IOM2 Gateway ") iom2_nm = prompt_ip(" IOM2 Subnet Mask") cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm) return True # ── Step 5a: Apply configuration via Redfish ────────────────────────────────── def apply_configuration(cfg: ShelfConfig) -> bool: rule("Step 5 of 5 -- Applying Configuration via Redfish API") info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...") print() results = [] all_ok = True for iom_cfg in [cfg.iom1, cfg.iom2]: success, detail = _apply_iom(cfg.password, iom_cfg) status = _c(C.GRN, "OK") if success else _c(C.RED, "FAIL") results.append([iom_cfg.iom, status, detail]) if not success: all_ok = False draw_table(["IOM", "Result", "Detail"], results, [6, 8, 50]) print() return all_ok def _apply_iom(password: str, iom_cfg: IOMConfig) -> tuple: """ Apply network config to a single IOM. DHCP: single PATCH enabling DHCPv4. Static: two sequential PATCHes to work around a firmware bug in the current ES24N release that prevents disabling DHCP and setting a static address in the same request. Pass 1 -- set the static IP/gateway/netmask (DHCP still on) Pass 2 -- disable DHCP (address is already committed) """ path = f"/redfish/v1/Managers/{iom_cfg.iom}/EthernetInterfaces/1" if iom_cfg.dhcp: # DHCP -- single call, no firmware quirk involved ok_flag, data = _redfish_request( password, "PATCH", path, {"DHCPv4": {"DHCPEnabled": True}}, ) if ok_flag: return True, "Configured: DHCP" return False, str(data)[:80] # Static -- Pass 1: set address while DHCP is still enabled info(f" {iom_cfg.iom} pass 1/2 -- setting static address {iom_cfg.ip}...") ok_flag, data = _redfish_request( password, "PATCH", path, { "IPv4StaticAddresses": [{ "Address": iom_cfg.ip, "Gateway": iom_cfg.gateway, "SubnetMask": iom_cfg.netmask, }] }, ) if not ok_flag: return False, f"Pass 1 failed: {str(data)[:70]}" # Brief pause to allow the IOM to commit the address before the next call time.sleep(1) # Static -- Pass 2: disable DHCP now that the static address is committed info(f" {iom_cfg.iom} pass 2/2 -- disabling DHCP...") ok_flag, data = _redfish_request( password, "PATCH", path, {"DHCPv4": {"DHCPEnabled": False}}, ) if not ok_flag: return False, f"Pass 2 failed: {str(data)[:70]}" return True, f"Configured: Static {iom_cfg.ip}" # ── Step 5b: Print applied-settings summary ─────────────────────────────────── def print_summary(cfg: ShelfConfig, changed: bool): rule("Summary") def val(iom: IOMConfig, field: str) -> str: dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"} stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask} return (dhcp_map if iom.dhcp else stat_map).get(field, "") draw_table( ["Setting", "IOM1", "IOM2"], [ ["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")], ["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")], ["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")], ["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")], ["Serial Port", cfg.device, cfg.device], ["Changes", "Yes" if changed else "None", ""], ], [12, 22, 22], ) if changed: print() draw_box([ f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}", "", "Remove the serial cable ONLY after verifying each", "expander appears in TrueNAS with matching drives.", "", "TrueNAS > System Settings > Enclosure >", "NVMe-oF Expansion Shelves", ], colour=C.YEL) print() # ── Disconnect ──────────────────────────────────────────────────────────────── def close_serial_connection(ser: SerialPort, device: str): if ser and ser.is_open: ser.close() ok(f"Serial port {device} closed.") print() prompt("Disconnect the serial cable, then press Enter to continue") ok("Serial cable disconnected. Shelf complete.") # ── Full shelf configuration cycle ──────────────────────────────────────────── def configure_shelf() -> bool: """Run one complete shelf cycle. Returns True if user wants another shelf.""" banner() cfg = ShelfConfig() # 1 — Detect device device = detect_serial_device() if not device: error("Could not detect a serial device. Returning to main menu.") time.sleep(2) return True cfg.device = device # 2 — Open serial port & wake IOM console ser = open_serial_connection(device) if not ser: error("Could not open serial port. Returning to main menu.") time.sleep(2) return True # Password needed before any Redfish calls print() cfg.password = prompt_password() # 3 — Fetch & display current settings fetch_current_config(cfg) # 4 — Ask user: change or leave alone? apply_changes = collect_network_config(cfg) # 5 — Apply if requested changed = False if apply_changes: print() rule("Ready to Apply") info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1") print() if prompt_yn("Apply configuration now?", default=True): apply_configuration(cfg) changed = True else: warn("Configuration skipped — no changes were made.") # Summary print_summary(cfg, changed) # Disconnect close_serial_connection(ser, device) print() return prompt_yn("Configure another ES24N shelf?", default=False) # ── Entry point ─────────────────────────────────────────────────────────────── def main(): banner() print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration and firmware updates')}") print(f" {_c(C.DIM, 'using the Redfish API. No external dependencies.')}") print() while True: banner() draw_box([ f" {_c(C.BOLD, '1')} Configure a new ES24N shelf", f" {_c(C.BOLD, '2')} Update IOM / Fabric Card Firmware", f" {_c(C.BOLD, '3')} Exit", ]) print() choice = prompt("Select [1/2/3]") if choice == "1": another = configure_shelf() if not another: break elif choice == "2": firmware_update_workflow() elif choice == "3": break else: warn("Please enter 1, 2, or 3.") time.sleep(1) print() ok("Exiting ES24N IOM Configuration Tool. Goodbye.") print() if __name__ == "__main__": try: main() except KeyboardInterrupt: print() warn("Interrupted. Exiting.") sys.exit(0)