diff --git a/es24n-conf.py/es24n_conf.py b/es24n-conf.py/es24n_conf.py new file mode 100644 index 0000000..eba9eba --- /dev/null +++ b/es24n-conf.py/es24n_conf.py @@ -0,0 +1,673 @@ +#!/usr/bin/env python3 +""" +ES24N IOM Network Configuration Tool +TrueNAS ES24N Expansion Shelf — Serial Configuration Utility +Based on ES24N Product Service Guide v.26011 + +Zero external dependencies — Python 3 standard library only. +Compatible with TrueNAS (FreeBSD) and Linux. +""" + +import fcntl +import glob +import ipaddress +import json +import os +import select +import ssl +import subprocess +import sys +import termios +import time +import tty +import urllib.error +import urllib.request +from base64 import b64encode +from dataclasses import dataclass, field +from typing import Optional + +# ── ANSI colour helpers ─────────────────────────────────────────────────────── +class C: + RED = "\033[0;31m" + GRN = "\033[0;32m" + YEL = "\033[1;33m" + CYN = "\033[0;36m" + WHT = "\033[1;37m" + DIM = "\033[2m" + BOLD = "\033[1m" + RESET = "\033[0m" + CLEAR = "\033[2J\033[H" + +def _c(colour: str, text: str) -> str: + return f"{colour}{text}{C.RESET}" + +def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}") +def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}") +def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}") +def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}") + +def banner(): + print(C.CLEAR, end="") + w = 60 + print(_c(C.CYN, " +" + "-" * w + "+")) + print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD, + " TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |")) + print(_c(C.CYN, " |") + _c(C.DIM, + " Serial Network Setup v2.0 (stdlib only) ") + _c(C.CYN, " |")) + print(_c(C.CYN, " +" + "-" * w + "+")) + print() + +def rule(title: str = ""): + width = 60 + if title: + pad = max(0, width - len(title) - 2) + left = pad // 2 + right = pad - left + line = f"{'-' * left} {title} {'-' * right}" + else: + line = "-" * width + print(f"\n {_c(C.YEL, line)}\n") + +def draw_table(headers: list, rows: list, col_widths: list): + sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+" + + def fmt_row(cells): + return " | " + " | ".join( + str(c).ljust(w) for c, w in zip(cells, col_widths) + ) + " |" + + print(_c(C.DIM, sep)) + print(_c(C.BOLD, fmt_row(headers))) + print(_c(C.DIM, sep)) + for row in rows: + print(fmt_row(row)) + print(_c(C.DIM, sep)) + +def draw_box(lines: list, colour: str = C.CYN): + width = max(len(l) for l in lines) + 4 + print(f" {_c(colour, '+' + '-' * width + '+')}") + for line in lines: + pad = width - len(line) - 2 + print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}") + print(f" {_c(colour, '+' + '-' * width + '+')}") + + +# ── Input helpers ───────────────────────────────────────────────────────────── +def prompt(label: str, default: str = "", password: bool = False) -> str: + display = f" {_c(C.CYN, label)}" + if default: + display += f" {_c(C.DIM, f'[{default}]')}" + display += ": " + + if password: + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setraw(fd) + sys.stdout.write(display) + sys.stdout.flush() + chars = [] + while True: + ch = sys.stdin.read(1) + if ch in ("\r", "\n"): + break + elif ch in ("\x7f", "\x08"): + if chars: + chars.pop() + sys.stdout.write("\b \b") + sys.stdout.flush() + elif ch == "\x03": + raise KeyboardInterrupt + else: + chars.append(ch) + sys.stdout.write("*") + sys.stdout.flush() + print() + return "".join(chars) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + else: + sys.stdout.write(display) + sys.stdout.flush() + val = sys.stdin.readline().strip() + return val if val else default + + +def prompt_ip(label: str) -> str: + while True: + val = prompt(label) + try: + ipaddress.IPv4Address(val) + return val + except ValueError: + warn(f"'{val}' is not a valid IPv4 address — please try again.") + + +def prompt_yn(label: str, default: bool = True) -> bool: + hint = "Y/n" if default else "y/N" + val = prompt(f"{label} [{hint}]").strip().lower() + if not val: + return default + return val in ("y", "yes") + + +def prompt_password() -> str: + while True: + val = prompt( + "Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)", + password=True, + ) + if val: + return val + warn("Password cannot be empty.") + + +# ── Data classes ────────────────────────────────────────────────────────────── +@dataclass +class IOMConfig: + iom: str + dhcp: bool = True + ip: str = "" + gateway: str = "" + netmask: str = "" + +@dataclass +class ShelfConfig: + device: str = "" + password: str = "" + iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1")) + iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2")) + + +# ── Serial port (stdlib: termios / fcntl / select) ──────────────────────────── +class SerialPort: + """ + Minimal 8N1 serial port using only the Python standard library. + Replaces pyserial for TrueNAS environments where pip is unavailable. + """ + + BAUD_MAP = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + } + + def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self._fd: Optional[int] = None + self._saved_attrs = None + + # ── Open / close ────────────────────────────────────────────────────────── + def open(self): + try: + self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + except OSError as e: + raise OSError(f"Cannot open {self.port}: {e}") from e + + # Switch back to blocking mode now that O_NOCTTY is set + flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) + fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) + + self._saved_attrs = termios.tcgetattr(self._fd) + + # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc + attrs = list(termios.tcgetattr(self._fd)) + baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) + + attrs[0] = termios.IGNBRK # iflag + attrs[1] = 0 # oflag + attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag + attrs[3] = 0 # lflag (raw) + attrs[4] = baud # ispeed + attrs[5] = baud # ospeed + attrs[6][termios.VMIN] = 0 + attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) + + termios.tcsetattr(self._fd, termios.TCSANOW, attrs) + termios.tcflush(self._fd, termios.TCIOFLUSH) + + def close(self): + if self._fd is not None: + try: + if self._saved_attrs: + termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) + os.close(self._fd) + except OSError: + pass + finally: + self._fd = None + + @property + def is_open(self) -> bool: + return self._fd is not None + + # ── Read / write ────────────────────────────────────────────────────────── + def write(self, data: bytes): + if self._fd is None: + raise OSError("Port is not open") + os.write(self._fd, data) + + def read_chunk(self, size: int = 4096) -> bytes: + if self._fd is None: + raise OSError("Port is not open") + try: + return os.read(self._fd, size) + except OSError: + return b"" + + def read_until_quiet(self, quiet_period: float = 0.5) -> str: + """ + Read until no new bytes arrive for `quiet_period` seconds, + or until `self.timeout` total seconds have elapsed. + """ + output = b"" + deadline = time.monotonic() + self.timeout + last_rx = time.monotonic() + + while True: + now = time.monotonic() + if now >= deadline: + break + if output and (now - last_rx) >= quiet_period: + break + + wait = min(deadline - now, quiet_period) + ready, _, _ = select.select([self._fd], [], [], wait) + if ready: + chunk = self.read_chunk() + if chunk: + output += chunk + last_rx = time.monotonic() + + return output.decode("utf-8", errors="replace") + + def send_line(self, cmd: str = "", delay: float = 0.3): + self.write((cmd + "\r\n").encode("utf-8")) + time.sleep(delay) + + # ── Context manager ─────────────────────────────────────────────────────── + def __enter__(self): + self.open() + return self + + def __exit__(self, *_): + self.close() + + +# ── Step 1: Detect serial device ────────────────────────────────────────────── +def detect_serial_device() -> Optional[str]: + rule("Step 1 of 6 -- Serial Cable & Device Detection") + + print(" Connect the serial cable from the ES24N IOM1 port") + print(" to the active F-Series controller USB port.") + print() + prompt("Press Enter when the cable is connected") + + for attempt in range(1, 4): + info(f"Scanning for USB serial devices (attempt {attempt}/3)...") + time.sleep(1) + + # FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM* + patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"] + ports = sorted({p for pat in patterns for p in glob.glob(pat)}) + + if ports: + break + + if attempt < 3: + warn("No device found yet — retrying in 2 seconds...") + time.sleep(2) + + if not ports: + error("No USB serial device detected after 3 attempts.") + print() + print(" Troubleshooting:") + print(" - Ensure the serial cable is fully seated at both ends.") + print(" - Try a different USB port on the controller.") + print(" - Confirm the ES24N is powered on.") + return None + + if len(ports) == 1: + ok(f"Device found: {_c(C.BOLD, ports[0])}") + _fix_permissions(ports[0]) + return ports[0] + + # Multiple devices — let the user choose + print() + draw_table( + ["#", "Device"], + [[str(i), p] for i, p in enumerate(ports, 1)], + [4, 24], + ) + print() + + while True: + val = prompt(f"Select device number [1-{len(ports)}]") + if val.isdigit() and 1 <= int(val) <= len(ports): + selected = ports[int(val) - 1] + ok(f"Selected: {_c(C.BOLD, selected)}") + _fix_permissions(selected) + return selected + warn(f"Please enter a number between 1 and {len(ports)}.") + + +def _fix_permissions(device: str): + try: + result = subprocess.run( + ["sudo", "chown", ":wheel", device], + capture_output=True, timeout=5, + ) + if result.returncode == 0: + ok(f"Permissions updated on {device}") + return + except Exception: + pass + try: + os.chmod(device, 0o666) + ok(f"Permissions updated on {device}") + except PermissionError: + warn("Could not update device permissions automatically.") + warn("If the connection fails, re-run this script with sudo.") + + +# ── Step 2: Open serial connection & wake IOM console ───────────────────────── +def open_serial_connection(device: str) -> Optional[SerialPort]: + rule("Step 2 of 6 -- Opening Serial Connection") + + info(f"Opening {device} at 115200 baud (8N1)...") + ser = SerialPort(device, baudrate=115200, timeout=5.0) + + try: + ser.open() + except OSError as e: + error(str(e)) + return None + + ok(f"Port opened: {device}") + info("Sending wake signal to IOM console...") + + ser.send_line("", delay=0.5) + ser.send_line("", delay=0.5) + response = ser.read_until_quiet(quiet_period=0.5) + + print() + if response.strip(): + print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}") + for line in response.strip().splitlines(): + print(f" {_c(C.DIM, '|')} {line}") + print(f" {_c(C.DIM, '+' + '-' * 56)}") + print() + + low = response.lower() + if any(kw in low for kw in ("login", "$", "#", "password")): + ok("IOM console is responsive.") + else: + warn("Unexpected response — the IOM may still be booting.") + warn("You can continue; the Redfish API operates independently.") + else: + warn("No response received from IOM console.") + warn("The Redfish API may still be reachable. Continuing...") + + print() + return ser + + +# ── Step 3: Collect network configuration ───────────────────────────────────── +def collect_network_config(cfg: ShelfConfig): + rule("Step 3 of 6 -- IOM Network Configuration") + + print(" How should the IOMs be configured?") + print(f" {_c(C.BOLD, '1')} Static IP addresses") + print(f" {_c(C.BOLD, '2')} DHCP") + print() + + while True: + choice = prompt("Select option [1/2]") + if choice in ("1", "2"): + break + warn("Please enter 1 or 2.") + + use_dhcp = (choice == "2") + print() + + cfg.password = prompt_password() + + if use_dhcp: + cfg.iom1 = IOMConfig("IOM1", dhcp=True) + cfg.iom2 = IOMConfig("IOM2", dhcp=True) + print() + ok("Both IOMs will be set to DHCP.") + return + + # Static — IOM1 + print() + info(f"Static network details for {_c(C.BOLD, 'IOM1')}:") + iom1_ip = prompt_ip(" IOM1 IP address ") + iom1_gw = prompt_ip(" IOM1 Gateway ") + iom1_nm = prompt_ip(" IOM1 Subnet Mask") + cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm) + + # Static — IOM2 + print() + info(f"Static network details for {_c(C.BOLD, 'IOM2')}:") + iom2_ip = prompt_ip(" IOM2 IP address ") + + same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True) + if same: + iom2_gw, iom2_nm = iom1_gw, iom1_nm + else: + iom2_gw = prompt_ip(" IOM2 Gateway ") + iom2_nm = prompt_ip(" IOM2 Subnet Mask") + + cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm) + + +# ── Step 4: Apply configuration via Redfish ─────────────────────────────────── +def apply_configuration(cfg: ShelfConfig) -> bool: + rule("Step 4 of 6 -- Applying Configuration via Redfish API") + + info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...") + print() + + results = [] + all_ok = True + for iom_cfg in [cfg.iom1, cfg.iom2]: + success, message = _patch_iom(cfg.password, iom_cfg) + results.append([iom_cfg.iom, + f"{_c(C.GRN, 'OK')}" if success else f"{_c(C.RED, 'FAIL')}", + message]) + if not success: + all_ok = False + + draw_table(["IOM", "Result", "Detail"], results, [6, 8, 44]) + print() + return all_ok + + +def _patch_iom(password: str, iom: IOMConfig) -> tuple: + url = f"https://127.0.0.1/redfish/v1/Managers/{iom.iom}/EthernetInterfaces/1" + + if iom.dhcp: + payload = {"DHCPv4": {"DHCPEnabled": True}} + else: + payload = { + "DHCPv4": {"DHCPEnabled": False}, + "IPv4StaticAddresses": [{ + "Address": iom.ip, + "Gateway": iom.gateway, + "SubnetMask": iom.netmask, + }], + } + + data = json.dumps(payload).encode("utf-8") + credentials = b64encode(f"Admin:{password}".encode()).decode() + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + req = urllib.request.Request( + url, + data=data, + method="PATCH", + headers={ + "Content-Type": "application/json", + "Authorization": f"Basic {credentials}", + }, + ) + + try: + with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: + if resp.status in (200, 204): + mode = "DHCP" if iom.dhcp else f"static {iom.ip}" + return True, f"Configured: {mode}" + body = resp.read().decode("utf-8", errors="replace") + return False, f"HTTP {resp.status}: {body[:80]}" + + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + try: + msg = json.loads(body).get("error", {}).get("message", body) + except json.JSONDecodeError: + msg = body + return False, f"HTTP {e.code}: {msg[:80]}" + + except OSError as e: + return False, f"Connection error: {e}" + + +# ── Step 5: Print configuration summary ─────────────────────────────────────── +def print_summary(cfg: ShelfConfig): + rule("Step 5 of 6 -- Configuration Summary") + + def val(iom: IOMConfig, field: str) -> str: + dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"} + stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask} + return (dhcp_map if iom.dhcp else stat_map).get(field, "") + + draw_table( + ["Setting", "IOM1", "IOM2"], + [ + ["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")], + ["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")], + ["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")], + ["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")], + ["Serial Port", cfg.device, cfg.device], + ], + [12, 22, 22], + ) + + print() + draw_box([ + f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}", + "", + "Remove the serial cable ONLY after verifying each", + "expander appears in TrueNAS with matching drives.", + "", + "TrueNAS > System Settings > Enclosure >", + "NVMe-oF Expansion Shelves", + ], colour=C.YEL) + print() + + +# ── Step 6: Close serial connection ─────────────────────────────────────────── +def close_serial_connection(ser: SerialPort, device: str): + rule("Step 6 of 6 -- Close Serial Connection") + + if ser and ser.is_open: + ser.close() + ok(f"Serial port {device} closed.") + + print() + prompt("Disconnect the serial cable, then press Enter to continue") + ok("Serial cable disconnected. Shelf configuration complete.") + + +# ── Full shelf configuration cycle ──────────────────────────────────────────── +def configure_shelf() -> bool: + """Run one complete shelf cycle. Returns True if user wants another.""" + banner() + cfg = ShelfConfig() + + # 1 — Detect device + device = detect_serial_device() + if not device: + error("Could not detect a serial device. Returning to main menu.") + time.sleep(2) + return True + cfg.device = device + + # 2 — Open serial port + ser = open_serial_connection(device) + if not ser: + error("Could not open serial port. Returning to main menu.") + time.sleep(2) + return True + + # 3 — Collect settings + collect_network_config(cfg) + + # 4 — Confirm & apply + print() + rule("Ready to Apply") + info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1") + info("using the active serial session as the communication path.") + print() + + if prompt_yn("Apply configuration now?", default=True): + apply_configuration(cfg) + else: + warn("Configuration skipped — no changes were made.") + + # 5 — Summary & reminder + print_summary(cfg) + + # 6 — Close serial port + close_serial_connection(ser, device) + + print() + return prompt_yn("Configure another ES24N shelf?", default=False) + + +# ── Entry point ─────────────────────────────────────────────────────────────── +def main(): + banner() + print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration over a direct serial')}") + print(f" {_c(C.DIM, 'connection using the Redfish API (loopback).')}") + print(f" {_c(C.DIM, 'No external dependencies -- Python 3 standard library only.')}") + print() + + while True: + banner() + draw_box([ + f" {_c(C.BOLD, '1')} Configure a new ES24N shelf", + f" {_c(C.BOLD, '2')} Exit", + ]) + print() + + choice = prompt("Select [1/2]") + if choice == "1": + another = configure_shelf() + if not another: + break + elif choice == "2": + break + else: + warn("Please enter 1 or 2.") + time.sleep(1) + + print() + ok("Exiting ES24N IOM Configuration Tool. Goodbye.") + print() + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print() + warn("Interrupted. Exiting.") + sys.exit(0)