From 7d59c3676cdbfe04244031c1258a956c557ba428 Mon Sep 17 00:00:00 2001 From: scott Date: Tue, 3 Mar 2026 21:19:47 -0500 Subject: [PATCH] Delete es24n-conf.py/es24n_conf.py --- es24n-conf.py/es24n_conf.py | 673 ------------------------------------ 1 file changed, 673 deletions(-) delete mode 100644 es24n-conf.py/es24n_conf.py diff --git a/es24n-conf.py/es24n_conf.py b/es24n-conf.py/es24n_conf.py deleted file mode 100644 index eba9eba..0000000 --- a/es24n-conf.py/es24n_conf.py +++ /dev/null @@ -1,673 +0,0 @@ -#!/usr/bin/env python3 -""" -ES24N IOM Network Configuration Tool -TrueNAS ES24N Expansion Shelf — Serial Configuration Utility -Based on ES24N Product Service Guide v.26011 - -Zero external dependencies — Python 3 standard library only. -Compatible with TrueNAS (FreeBSD) and Linux. -""" - -import fcntl -import glob -import ipaddress -import json -import os -import select -import ssl -import subprocess -import sys -import termios -import time -import tty -import urllib.error -import urllib.request -from base64 import b64encode -from dataclasses import dataclass, field -from typing import Optional - -# ── ANSI colour helpers ─────────────────────────────────────────────────────── -class C: - RED = "\033[0;31m" - GRN = "\033[0;32m" - YEL = "\033[1;33m" - CYN = "\033[0;36m" - WHT = "\033[1;37m" - DIM = "\033[2m" - BOLD = "\033[1m" - RESET = "\033[0m" - CLEAR = "\033[2J\033[H" - -def _c(colour: str, text: str) -> str: - return f"{colour}{text}{C.RESET}" - -def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}") -def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}") -def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}") -def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}") - -def banner(): - print(C.CLEAR, end="") - w = 60 - print(_c(C.CYN, " +" + "-" * w + "+")) - print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD, - " TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |")) - print(_c(C.CYN, " |") + _c(C.DIM, - " Serial Network Setup v2.0 (stdlib only) ") + _c(C.CYN, " |")) - print(_c(C.CYN, " +" + "-" * w + "+")) - print() - -def rule(title: str = ""): - width = 60 - if title: - pad = max(0, width - len(title) - 2) - left = pad // 2 - right = pad - left - line = f"{'-' * left} {title} {'-' * right}" - else: - line = "-" * width - print(f"\n {_c(C.YEL, line)}\n") - -def draw_table(headers: list, rows: list, col_widths: list): - sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+" - - def fmt_row(cells): - return " | " + " | ".join( - str(c).ljust(w) for c, w in zip(cells, col_widths) - ) + " |" - - print(_c(C.DIM, sep)) - print(_c(C.BOLD, fmt_row(headers))) - print(_c(C.DIM, sep)) - for row in rows: - print(fmt_row(row)) - print(_c(C.DIM, sep)) - -def draw_box(lines: list, colour: str = C.CYN): - width = max(len(l) for l in lines) + 4 - print(f" {_c(colour, '+' + '-' * width + '+')}") - for line in lines: - pad = width - len(line) - 2 - print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}") - print(f" {_c(colour, '+' + '-' * width + '+')}") - - -# ── Input helpers ───────────────────────────────────────────────────────────── -def prompt(label: str, default: str = "", password: bool = False) -> str: - display = f" {_c(C.CYN, label)}" - if default: - display += f" {_c(C.DIM, f'[{default}]')}" - display += ": " - - if password: - fd = sys.stdin.fileno() - old = termios.tcgetattr(fd) - try: - tty.setraw(fd) - sys.stdout.write(display) - sys.stdout.flush() - chars = [] - while True: - ch = sys.stdin.read(1) - if ch in ("\r", "\n"): - break - elif ch in ("\x7f", "\x08"): - if chars: - chars.pop() - sys.stdout.write("\b \b") - sys.stdout.flush() - elif ch == "\x03": - raise KeyboardInterrupt - else: - chars.append(ch) - sys.stdout.write("*") - sys.stdout.flush() - print() - return "".join(chars) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old) - else: - sys.stdout.write(display) - sys.stdout.flush() - val = sys.stdin.readline().strip() - return val if val else default - - -def prompt_ip(label: str) -> str: - while True: - val = prompt(label) - try: - ipaddress.IPv4Address(val) - return val - except ValueError: - warn(f"'{val}' is not a valid IPv4 address — please try again.") - - -def prompt_yn(label: str, default: bool = True) -> bool: - hint = "Y/n" if default else "y/N" - val = prompt(f"{label} [{hint}]").strip().lower() - if not val: - return default - return val in ("y", "yes") - - -def prompt_password() -> str: - while True: - val = prompt( - "Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)", - password=True, - ) - if val: - return val - warn("Password cannot be empty.") - - -# ── Data classes ────────────────────────────────────────────────────────────── -@dataclass -class IOMConfig: - iom: str - dhcp: bool = True - ip: str = "" - gateway: str = "" - netmask: str = "" - -@dataclass -class ShelfConfig: - device: str = "" - password: str = "" - iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1")) - iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2")) - - -# ── Serial port (stdlib: termios / fcntl / select) ──────────────────────────── -class SerialPort: - """ - Minimal 8N1 serial port using only the Python standard library. - Replaces pyserial for TrueNAS environments where pip is unavailable. - """ - - BAUD_MAP = { - 9600: termios.B9600, - 19200: termios.B19200, - 38400: termios.B38400, - 57600: termios.B57600, - 115200: termios.B115200, - } - - def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0): - self.port = port - self.baudrate = baudrate - self.timeout = timeout - self._fd: Optional[int] = None - self._saved_attrs = None - - # ── Open / close ────────────────────────────────────────────────────────── - def open(self): - try: - self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) - except OSError as e: - raise OSError(f"Cannot open {self.port}: {e}") from e - - # Switch back to blocking mode now that O_NOCTTY is set - flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) - fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) - - self._saved_attrs = termios.tcgetattr(self._fd) - - # Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc - attrs = list(termios.tcgetattr(self._fd)) - baud = self.BAUD_MAP.get(self.baudrate, termios.B115200) - - attrs[0] = termios.IGNBRK # iflag - attrs[1] = 0 # oflag - attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag - attrs[3] = 0 # lflag (raw) - attrs[4] = baud # ispeed - attrs[5] = baud # ospeed - attrs[6][termios.VMIN] = 0 - attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255) - - termios.tcsetattr(self._fd, termios.TCSANOW, attrs) - termios.tcflush(self._fd, termios.TCIOFLUSH) - - def close(self): - if self._fd is not None: - try: - if self._saved_attrs: - termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs) - os.close(self._fd) - except OSError: - pass - finally: - self._fd = None - - @property - def is_open(self) -> bool: - return self._fd is not None - - # ── Read / write ────────────────────────────────────────────────────────── - def write(self, data: bytes): - if self._fd is None: - raise OSError("Port is not open") - os.write(self._fd, data) - - def read_chunk(self, size: int = 4096) -> bytes: - if self._fd is None: - raise OSError("Port is not open") - try: - return os.read(self._fd, size) - except OSError: - return b"" - - def read_until_quiet(self, quiet_period: float = 0.5) -> str: - """ - Read until no new bytes arrive for `quiet_period` seconds, - or until `self.timeout` total seconds have elapsed. - """ - output = b"" - deadline = time.monotonic() + self.timeout - last_rx = time.monotonic() - - while True: - now = time.monotonic() - if now >= deadline: - break - if output and (now - last_rx) >= quiet_period: - break - - wait = min(deadline - now, quiet_period) - ready, _, _ = select.select([self._fd], [], [], wait) - if ready: - chunk = self.read_chunk() - if chunk: - output += chunk - last_rx = time.monotonic() - - return output.decode("utf-8", errors="replace") - - def send_line(self, cmd: str = "", delay: float = 0.3): - self.write((cmd + "\r\n").encode("utf-8")) - time.sleep(delay) - - # ── Context manager ─────────────────────────────────────────────────────── - def __enter__(self): - self.open() - return self - - def __exit__(self, *_): - self.close() - - -# ── Step 1: Detect serial device ────────────────────────────────────────────── -def detect_serial_device() -> Optional[str]: - rule("Step 1 of 6 -- Serial Cable & Device Detection") - - print(" Connect the serial cable from the ES24N IOM1 port") - print(" to the active F-Series controller USB port.") - print() - prompt("Press Enter when the cable is connected") - - for attempt in range(1, 4): - info(f"Scanning for USB serial devices (attempt {attempt}/3)...") - time.sleep(1) - - # FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM* - patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"] - ports = sorted({p for pat in patterns for p in glob.glob(pat)}) - - if ports: - break - - if attempt < 3: - warn("No device found yet — retrying in 2 seconds...") - time.sleep(2) - - if not ports: - error("No USB serial device detected after 3 attempts.") - print() - print(" Troubleshooting:") - print(" - Ensure the serial cable is fully seated at both ends.") - print(" - Try a different USB port on the controller.") - print(" - Confirm the ES24N is powered on.") - return None - - if len(ports) == 1: - ok(f"Device found: {_c(C.BOLD, ports[0])}") - _fix_permissions(ports[0]) - return ports[0] - - # Multiple devices — let the user choose - print() - draw_table( - ["#", "Device"], - [[str(i), p] for i, p in enumerate(ports, 1)], - [4, 24], - ) - print() - - while True: - val = prompt(f"Select device number [1-{len(ports)}]") - if val.isdigit() and 1 <= int(val) <= len(ports): - selected = ports[int(val) - 1] - ok(f"Selected: {_c(C.BOLD, selected)}") - _fix_permissions(selected) - return selected - warn(f"Please enter a number between 1 and {len(ports)}.") - - -def _fix_permissions(device: str): - try: - result = subprocess.run( - ["sudo", "chown", ":wheel", device], - capture_output=True, timeout=5, - ) - if result.returncode == 0: - ok(f"Permissions updated on {device}") - return - except Exception: - pass - try: - os.chmod(device, 0o666) - ok(f"Permissions updated on {device}") - except PermissionError: - warn("Could not update device permissions automatically.") - warn("If the connection fails, re-run this script with sudo.") - - -# ── Step 2: Open serial connection & wake IOM console ───────────────────────── -def open_serial_connection(device: str) -> Optional[SerialPort]: - rule("Step 2 of 6 -- Opening Serial Connection") - - info(f"Opening {device} at 115200 baud (8N1)...") - ser = SerialPort(device, baudrate=115200, timeout=5.0) - - try: - ser.open() - except OSError as e: - error(str(e)) - return None - - ok(f"Port opened: {device}") - info("Sending wake signal to IOM console...") - - ser.send_line("", delay=0.5) - ser.send_line("", delay=0.5) - response = ser.read_until_quiet(quiet_period=0.5) - - print() - if response.strip(): - print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}") - for line in response.strip().splitlines(): - print(f" {_c(C.DIM, '|')} {line}") - print(f" {_c(C.DIM, '+' + '-' * 56)}") - print() - - low = response.lower() - if any(kw in low for kw in ("login", "$", "#", "password")): - ok("IOM console is responsive.") - else: - warn("Unexpected response — the IOM may still be booting.") - warn("You can continue; the Redfish API operates independently.") - else: - warn("No response received from IOM console.") - warn("The Redfish API may still be reachable. Continuing...") - - print() - return ser - - -# ── Step 3: Collect network configuration ───────────────────────────────────── -def collect_network_config(cfg: ShelfConfig): - rule("Step 3 of 6 -- IOM Network Configuration") - - print(" How should the IOMs be configured?") - print(f" {_c(C.BOLD, '1')} Static IP addresses") - print(f" {_c(C.BOLD, '2')} DHCP") - print() - - while True: - choice = prompt("Select option [1/2]") - if choice in ("1", "2"): - break - warn("Please enter 1 or 2.") - - use_dhcp = (choice == "2") - print() - - cfg.password = prompt_password() - - if use_dhcp: - cfg.iom1 = IOMConfig("IOM1", dhcp=True) - cfg.iom2 = IOMConfig("IOM2", dhcp=True) - print() - ok("Both IOMs will be set to DHCP.") - return - - # Static — IOM1 - print() - info(f"Static network details for {_c(C.BOLD, 'IOM1')}:") - iom1_ip = prompt_ip(" IOM1 IP address ") - iom1_gw = prompt_ip(" IOM1 Gateway ") - iom1_nm = prompt_ip(" IOM1 Subnet Mask") - cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm) - - # Static — IOM2 - print() - info(f"Static network details for {_c(C.BOLD, 'IOM2')}:") - iom2_ip = prompt_ip(" IOM2 IP address ") - - same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True) - if same: - iom2_gw, iom2_nm = iom1_gw, iom1_nm - else: - iom2_gw = prompt_ip(" IOM2 Gateway ") - iom2_nm = prompt_ip(" IOM2 Subnet Mask") - - cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm) - - -# ── Step 4: Apply configuration via Redfish ─────────────────────────────────── -def apply_configuration(cfg: ShelfConfig) -> bool: - rule("Step 4 of 6 -- Applying Configuration via Redfish API") - - info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...") - print() - - results = [] - all_ok = True - for iom_cfg in [cfg.iom1, cfg.iom2]: - success, message = _patch_iom(cfg.password, iom_cfg) - results.append([iom_cfg.iom, - f"{_c(C.GRN, 'OK')}" if success else f"{_c(C.RED, 'FAIL')}", - message]) - if not success: - all_ok = False - - draw_table(["IOM", "Result", "Detail"], results, [6, 8, 44]) - print() - return all_ok - - -def _patch_iom(password: str, iom: IOMConfig) -> tuple: - url = f"https://127.0.0.1/redfish/v1/Managers/{iom.iom}/EthernetInterfaces/1" - - if iom.dhcp: - payload = {"DHCPv4": {"DHCPEnabled": True}} - else: - payload = { - "DHCPv4": {"DHCPEnabled": False}, - "IPv4StaticAddresses": [{ - "Address": iom.ip, - "Gateway": iom.gateway, - "SubnetMask": iom.netmask, - }], - } - - data = json.dumps(payload).encode("utf-8") - credentials = b64encode(f"Admin:{password}".encode()).decode() - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - - req = urllib.request.Request( - url, - data=data, - method="PATCH", - headers={ - "Content-Type": "application/json", - "Authorization": f"Basic {credentials}", - }, - ) - - try: - with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: - if resp.status in (200, 204): - mode = "DHCP" if iom.dhcp else f"static {iom.ip}" - return True, f"Configured: {mode}" - body = resp.read().decode("utf-8", errors="replace") - return False, f"HTTP {resp.status}: {body[:80]}" - - except urllib.error.HTTPError as e: - body = e.read().decode("utf-8", errors="replace") - try: - msg = json.loads(body).get("error", {}).get("message", body) - except json.JSONDecodeError: - msg = body - return False, f"HTTP {e.code}: {msg[:80]}" - - except OSError as e: - return False, f"Connection error: {e}" - - -# ── Step 5: Print configuration summary ─────────────────────────────────────── -def print_summary(cfg: ShelfConfig): - rule("Step 5 of 6 -- Configuration Summary") - - def val(iom: IOMConfig, field: str) -> str: - dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"} - stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask} - return (dhcp_map if iom.dhcp else stat_map).get(field, "") - - draw_table( - ["Setting", "IOM1", "IOM2"], - [ - ["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")], - ["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")], - ["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")], - ["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")], - ["Serial Port", cfg.device, cfg.device], - ], - [12, 22, 22], - ) - - print() - draw_box([ - f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}", - "", - "Remove the serial cable ONLY after verifying each", - "expander appears in TrueNAS with matching drives.", - "", - "TrueNAS > System Settings > Enclosure >", - "NVMe-oF Expansion Shelves", - ], colour=C.YEL) - print() - - -# ── Step 6: Close serial connection ─────────────────────────────────────────── -def close_serial_connection(ser: SerialPort, device: str): - rule("Step 6 of 6 -- Close Serial Connection") - - if ser and ser.is_open: - ser.close() - ok(f"Serial port {device} closed.") - - print() - prompt("Disconnect the serial cable, then press Enter to continue") - ok("Serial cable disconnected. Shelf configuration complete.") - - -# ── Full shelf configuration cycle ──────────────────────────────────────────── -def configure_shelf() -> bool: - """Run one complete shelf cycle. Returns True if user wants another.""" - banner() - cfg = ShelfConfig() - - # 1 — Detect device - device = detect_serial_device() - if not device: - error("Could not detect a serial device. Returning to main menu.") - time.sleep(2) - return True - cfg.device = device - - # 2 — Open serial port - ser = open_serial_connection(device) - if not ser: - error("Could not open serial port. Returning to main menu.") - time.sleep(2) - return True - - # 3 — Collect settings - collect_network_config(cfg) - - # 4 — Confirm & apply - print() - rule("Ready to Apply") - info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1") - info("using the active serial session as the communication path.") - print() - - if prompt_yn("Apply configuration now?", default=True): - apply_configuration(cfg) - else: - warn("Configuration skipped — no changes were made.") - - # 5 — Summary & reminder - print_summary(cfg) - - # 6 — Close serial port - close_serial_connection(ser, device) - - print() - return prompt_yn("Configure another ES24N shelf?", default=False) - - -# ── Entry point ─────────────────────────────────────────────────────────────── -def main(): - banner() - print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration over a direct serial')}") - print(f" {_c(C.DIM, 'connection using the Redfish API (loopback).')}") - print(f" {_c(C.DIM, 'No external dependencies -- Python 3 standard library only.')}") - print() - - while True: - banner() - draw_box([ - f" {_c(C.BOLD, '1')} Configure a new ES24N shelf", - f" {_c(C.BOLD, '2')} Exit", - ]) - print() - - choice = prompt("Select [1/2]") - if choice == "1": - another = configure_shelf() - if not another: - break - elif choice == "2": - break - else: - warn("Please enter 1 or 2.") - time.sleep(1) - - print() - ok("Exiting ES24N IOM Configuration Tool. Goodbye.") - print() - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print() - warn("Interrupted. Exiting.") - sys.exit(0)