Upload files to "es24n-conf.py"

This commit is contained in:
2026-03-03 21:18:09 -05:00
parent 21c45bc8bc
commit 2ddd5e397f

673
es24n-conf.py/es24n_conf.py Normal file
View File

@@ -0,0 +1,673 @@
#!/usr/bin/env python3
"""
ES24N IOM Network Configuration Tool
TrueNAS ES24N Expansion Shelf — Serial Configuration Utility
Based on ES24N Product Service Guide v.26011
Zero external dependencies — Python 3 standard library only.
Compatible with TrueNAS (FreeBSD) and Linux.
"""
import fcntl
import glob
import ipaddress
import json
import os
import select
import ssl
import subprocess
import sys
import termios
import time
import tty
import urllib.error
import urllib.request
from base64 import b64encode
from dataclasses import dataclass, field
from typing import Optional
# ── ANSI colour helpers ───────────────────────────────────────────────────────
class C:
RED = "\033[0;31m"
GRN = "\033[0;32m"
YEL = "\033[1;33m"
CYN = "\033[0;36m"
WHT = "\033[1;37m"
DIM = "\033[2m"
BOLD = "\033[1m"
RESET = "\033[0m"
CLEAR = "\033[2J\033[H"
def _c(colour: str, text: str) -> str:
return f"{colour}{text}{C.RESET}"
def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}")
def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}")
def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}")
def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}")
def banner():
print(C.CLEAR, end="")
w = 60
print(_c(C.CYN, " +" + "-" * w + "+"))
print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD,
" TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |"))
print(_c(C.CYN, " |") + _c(C.DIM,
" Serial Network Setup v2.0 (stdlib only) ") + _c(C.CYN, " |"))
print(_c(C.CYN, " +" + "-" * w + "+"))
print()
def rule(title: str = ""):
width = 60
if title:
pad = max(0, width - len(title) - 2)
left = pad // 2
right = pad - left
line = f"{'-' * left} {title} {'-' * right}"
else:
line = "-" * width
print(f"\n {_c(C.YEL, line)}\n")
def draw_table(headers: list, rows: list, col_widths: list):
sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+"
def fmt_row(cells):
return " | " + " | ".join(
str(c).ljust(w) for c, w in zip(cells, col_widths)
) + " |"
print(_c(C.DIM, sep))
print(_c(C.BOLD, fmt_row(headers)))
print(_c(C.DIM, sep))
for row in rows:
print(fmt_row(row))
print(_c(C.DIM, sep))
def draw_box(lines: list, colour: str = C.CYN):
width = max(len(l) for l in lines) + 4
print(f" {_c(colour, '+' + '-' * width + '+')}")
for line in lines:
pad = width - len(line) - 2
print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}")
print(f" {_c(colour, '+' + '-' * width + '+')}")
# ── Input helpers ─────────────────────────────────────────────────────────────
def prompt(label: str, default: str = "", password: bool = False) -> str:
display = f" {_c(C.CYN, label)}"
if default:
display += f" {_c(C.DIM, f'[{default}]')}"
display += ": "
if password:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
sys.stdout.write(display)
sys.stdout.flush()
chars = []
while True:
ch = sys.stdin.read(1)
if ch in ("\r", "\n"):
break
elif ch in ("\x7f", "\x08"):
if chars:
chars.pop()
sys.stdout.write("\b \b")
sys.stdout.flush()
elif ch == "\x03":
raise KeyboardInterrupt
else:
chars.append(ch)
sys.stdout.write("*")
sys.stdout.flush()
print()
return "".join(chars)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
else:
sys.stdout.write(display)
sys.stdout.flush()
val = sys.stdin.readline().strip()
return val if val else default
def prompt_ip(label: str) -> str:
while True:
val = prompt(label)
try:
ipaddress.IPv4Address(val)
return val
except ValueError:
warn(f"'{val}' is not a valid IPv4 address — please try again.")
def prompt_yn(label: str, default: bool = True) -> bool:
hint = "Y/n" if default else "y/N"
val = prompt(f"{label} [{hint}]").strip().lower()
if not val:
return default
return val in ("y", "yes")
def prompt_password() -> str:
while True:
val = prompt(
"Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)",
password=True,
)
if val:
return val
warn("Password cannot be empty.")
# ── Data classes ──────────────────────────────────────────────────────────────
@dataclass
class IOMConfig:
iom: str
dhcp: bool = True
ip: str = ""
gateway: str = ""
netmask: str = ""
@dataclass
class ShelfConfig:
device: str = ""
password: str = ""
iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1"))
iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2"))
# ── Serial port (stdlib: termios / fcntl / select) ────────────────────────────
class SerialPort:
"""
Minimal 8N1 serial port using only the Python standard library.
Replaces pyserial for TrueNAS environments where pip is unavailable.
"""
BAUD_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._fd: Optional[int] = None
self._saved_attrs = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
# Switch back to blocking mode now that O_NOCTTY is set
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
self._saved_attrs = termios.tcgetattr(self._fd)
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
attrs = list(termios.tcgetattr(self._fd))
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
attrs[0] = termios.IGNBRK # iflag
attrs[1] = 0 # oflag
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
attrs[3] = 0 # lflag (raw)
attrs[4] = baud # ispeed
attrs[5] = baud # ospeed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
termios.tcflush(self._fd, termios.TCIOFLUSH)
def close(self):
if self._fd is not None:
try:
if self._saved_attrs:
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
os.close(self._fd)
except OSError:
pass
finally:
self._fd = None
@property
def is_open(self) -> bool:
return self._fd is not None
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._fd is None:
raise OSError("Port is not open")
os.write(self._fd, data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._fd is None:
raise OSError("Port is not open")
try:
return os.read(self._fd, size)
except OSError:
return b""
def read_until_quiet(self, quiet_period: float = 0.5) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `self.timeout` total seconds have elapsed.
"""
output = b""
deadline = time.monotonic() + self.timeout
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
wait = min(deadline - now, quiet_period)
ready, _, _ = select.select([self._fd], [], [], wait)
if ready:
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()
# ── Step 1: Detect serial device ──────────────────────────────────────────────
def detect_serial_device() -> Optional[str]:
rule("Step 1 of 6 -- Serial Cable & Device Detection")
print(" Connect the serial cable from the ES24N IOM1 port")
print(" to the active F-Series controller USB port.")
print()
prompt("Press Enter when the cable is connected")
for attempt in range(1, 4):
info(f"Scanning for USB serial devices (attempt {attempt}/3)...")
time.sleep(1)
# FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM*
patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"]
ports = sorted({p for pat in patterns for p in glob.glob(pat)})
if ports:
break
if attempt < 3:
warn("No device found yet — retrying in 2 seconds...")
time.sleep(2)
if not ports:
error("No USB serial device detected after 3 attempts.")
print()
print(" Troubleshooting:")
print(" - Ensure the serial cable is fully seated at both ends.")
print(" - Try a different USB port on the controller.")
print(" - Confirm the ES24N is powered on.")
return None
if len(ports) == 1:
ok(f"Device found: {_c(C.BOLD, ports[0])}")
_fix_permissions(ports[0])
return ports[0]
# Multiple devices — let the user choose
print()
draw_table(
["#", "Device"],
[[str(i), p] for i, p in enumerate(ports, 1)],
[4, 24],
)
print()
while True:
val = prompt(f"Select device number [1-{len(ports)}]")
if val.isdigit() and 1 <= int(val) <= len(ports):
selected = ports[int(val) - 1]
ok(f"Selected: {_c(C.BOLD, selected)}")
_fix_permissions(selected)
return selected
warn(f"Please enter a number between 1 and {len(ports)}.")
def _fix_permissions(device: str):
try:
result = subprocess.run(
["sudo", "chown", ":wheel", device],
capture_output=True, timeout=5,
)
if result.returncode == 0:
ok(f"Permissions updated on {device}")
return
except Exception:
pass
try:
os.chmod(device, 0o666)
ok(f"Permissions updated on {device}")
except PermissionError:
warn("Could not update device permissions automatically.")
warn("If the connection fails, re-run this script with sudo.")
# ── Step 2: Open serial connection & wake IOM console ─────────────────────────
def open_serial_connection(device: str) -> Optional[SerialPort]:
rule("Step 2 of 6 -- Opening Serial Connection")
info(f"Opening {device} at 115200 baud (8N1)...")
ser = SerialPort(device, baudrate=115200, timeout=5.0)
try:
ser.open()
except OSError as e:
error(str(e))
return None
ok(f"Port opened: {device}")
info("Sending wake signal to IOM console...")
ser.send_line("", delay=0.5)
ser.send_line("", delay=0.5)
response = ser.read_until_quiet(quiet_period=0.5)
print()
if response.strip():
print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}")
for line in response.strip().splitlines():
print(f" {_c(C.DIM, '|')} {line}")
print(f" {_c(C.DIM, '+' + '-' * 56)}")
print()
low = response.lower()
if any(kw in low for kw in ("login", "$", "#", "password")):
ok("IOM console is responsive.")
else:
warn("Unexpected response — the IOM may still be booting.")
warn("You can continue; the Redfish API operates independently.")
else:
warn("No response received from IOM console.")
warn("The Redfish API may still be reachable. Continuing...")
print()
return ser
# ── Step 3: Collect network configuration ─────────────────────────────────────
def collect_network_config(cfg: ShelfConfig):
rule("Step 3 of 6 -- IOM Network Configuration")
print(" How should the IOMs be configured?")
print(f" {_c(C.BOLD, '1')} Static IP addresses")
print(f" {_c(C.BOLD, '2')} DHCP")
print()
while True:
choice = prompt("Select option [1/2]")
if choice in ("1", "2"):
break
warn("Please enter 1 or 2.")
use_dhcp = (choice == "2")
print()
cfg.password = prompt_password()
if use_dhcp:
cfg.iom1 = IOMConfig("IOM1", dhcp=True)
cfg.iom2 = IOMConfig("IOM2", dhcp=True)
print()
ok("Both IOMs will be set to DHCP.")
return
# Static — IOM1
print()
info(f"Static network details for {_c(C.BOLD, 'IOM1')}:")
iom1_ip = prompt_ip(" IOM1 IP address ")
iom1_gw = prompt_ip(" IOM1 Gateway ")
iom1_nm = prompt_ip(" IOM1 Subnet Mask")
cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm)
# Static — IOM2
print()
info(f"Static network details for {_c(C.BOLD, 'IOM2')}:")
iom2_ip = prompt_ip(" IOM2 IP address ")
same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True)
if same:
iom2_gw, iom2_nm = iom1_gw, iom1_nm
else:
iom2_gw = prompt_ip(" IOM2 Gateway ")
iom2_nm = prompt_ip(" IOM2 Subnet Mask")
cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm)
# ── Step 4: Apply configuration via Redfish ───────────────────────────────────
def apply_configuration(cfg: ShelfConfig) -> bool:
rule("Step 4 of 6 -- Applying Configuration via Redfish API")
info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...")
print()
results = []
all_ok = True
for iom_cfg in [cfg.iom1, cfg.iom2]:
success, message = _patch_iom(cfg.password, iom_cfg)
results.append([iom_cfg.iom,
f"{_c(C.GRN, 'OK')}" if success else f"{_c(C.RED, 'FAIL')}",
message])
if not success:
all_ok = False
draw_table(["IOM", "Result", "Detail"], results, [6, 8, 44])
print()
return all_ok
def _patch_iom(password: str, iom: IOMConfig) -> tuple:
url = f"https://127.0.0.1/redfish/v1/Managers/{iom.iom}/EthernetInterfaces/1"
if iom.dhcp:
payload = {"DHCPv4": {"DHCPEnabled": True}}
else:
payload = {
"DHCPv4": {"DHCPEnabled": False},
"IPv4StaticAddresses": [{
"Address": iom.ip,
"Gateway": iom.gateway,
"SubnetMask": iom.netmask,
}],
}
data = json.dumps(payload).encode("utf-8")
credentials = b64encode(f"Admin:{password}".encode()).decode()
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(
url,
data=data,
method="PATCH",
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {credentials}",
},
)
try:
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
if resp.status in (200, 204):
mode = "DHCP" if iom.dhcp else f"static {iom.ip}"
return True, f"Configured: {mode}"
body = resp.read().decode("utf-8", errors="replace")
return False, f"HTTP {resp.status}: {body[:80]}"
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(body).get("error", {}).get("message", body)
except json.JSONDecodeError:
msg = body
return False, f"HTTP {e.code}: {msg[:80]}"
except OSError as e:
return False, f"Connection error: {e}"
# ── Step 5: Print configuration summary ───────────────────────────────────────
def print_summary(cfg: ShelfConfig):
rule("Step 5 of 6 -- Configuration Summary")
def val(iom: IOMConfig, field: str) -> str:
dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"}
stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask}
return (dhcp_map if iom.dhcp else stat_map).get(field, "")
draw_table(
["Setting", "IOM1", "IOM2"],
[
["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")],
["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")],
["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")],
["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")],
["Serial Port", cfg.device, cfg.device],
],
[12, 22, 22],
)
print()
draw_box([
f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}",
"",
"Remove the serial cable ONLY after verifying each",
"expander appears in TrueNAS with matching drives.",
"",
"TrueNAS > System Settings > Enclosure >",
"NVMe-oF Expansion Shelves",
], colour=C.YEL)
print()
# ── Step 6: Close serial connection ───────────────────────────────────────────
def close_serial_connection(ser: SerialPort, device: str):
rule("Step 6 of 6 -- Close Serial Connection")
if ser and ser.is_open:
ser.close()
ok(f"Serial port {device} closed.")
print()
prompt("Disconnect the serial cable, then press Enter to continue")
ok("Serial cable disconnected. Shelf configuration complete.")
# ── Full shelf configuration cycle ────────────────────────────────────────────
def configure_shelf() -> bool:
"""Run one complete shelf cycle. Returns True if user wants another."""
banner()
cfg = ShelfConfig()
# 1 — Detect device
device = detect_serial_device()
if not device:
error("Could not detect a serial device. Returning to main menu.")
time.sleep(2)
return True
cfg.device = device
# 2 — Open serial port
ser = open_serial_connection(device)
if not ser:
error("Could not open serial port. Returning to main menu.")
time.sleep(2)
return True
# 3 — Collect settings
collect_network_config(cfg)
# 4 — Confirm & apply
print()
rule("Ready to Apply")
info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1")
info("using the active serial session as the communication path.")
print()
if prompt_yn("Apply configuration now?", default=True):
apply_configuration(cfg)
else:
warn("Configuration skipped — no changes were made.")
# 5 — Summary & reminder
print_summary(cfg)
# 6 — Close serial port
close_serial_connection(ser, device)
print()
return prompt_yn("Configure another ES24N shelf?", default=False)
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
banner()
print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration over a direct serial')}")
print(f" {_c(C.DIM, 'connection using the Redfish API (loopback).')}")
print(f" {_c(C.DIM, 'No external dependencies -- Python 3 standard library only.')}")
print()
while True:
banner()
draw_box([
f" {_c(C.BOLD, '1')} Configure a new ES24N shelf",
f" {_c(C.BOLD, '2')} Exit",
])
print()
choice = prompt("Select [1/2]")
if choice == "1":
another = configure_shelf()
if not another:
break
elif choice == "2":
break
else:
warn("Please enter 1 or 2.")
time.sleep(1)
print()
ok("Exiting ES24N IOM Configuration Tool. Goodbye.")
print()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
warn("Interrupted. Exiting.")
sys.exit(0)