Files
es24n-conf/es24n_conf.py

773 lines
26 KiB
Python

#!/usr/bin/env python3
"""
ES24N IOM Network Configuration Tool
TrueNAS ES24N Expansion Shelf — Serial Configuration Utility
Based on ES24N Product Service Guide v.26011
Zero external dependencies — Python 3 standard library only.
Compatible with TrueNAS (FreeBSD) and Linux.
"""
import fcntl
import glob
import ipaddress
import json
import os
import select
import ssl
import subprocess
import sys
import termios
import time
import tty
import urllib.error
import urllib.request
from base64 import b64encode
from dataclasses import dataclass, field
from typing import Optional
# ── ANSI colour helpers ───────────────────────────────────────────────────────
class C:
RED = "\033[0;31m"
GRN = "\033[0;32m"
YEL = "\033[1;33m"
CYN = "\033[0;36m"
WHT = "\033[1;37m"
DIM = "\033[2m"
BOLD = "\033[1m"
RESET = "\033[0m"
CLEAR = "\033[2J\033[H"
def _c(colour: str, text: str) -> str:
return f"{colour}{text}{C.RESET}"
def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}")
def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}")
def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}")
def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}")
def banner():
print(C.CLEAR, end="")
w = 60
print(_c(C.CYN, " +" + "-" * w + "+"))
print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD,
" TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |"))
print(_c(C.CYN, " |") + _c(C.DIM,
" Serial Network Setup v2.0 (stdlib only) ") + _c(C.CYN, " |"))
print(_c(C.CYN, " +" + "-" * w + "+"))
print()
def rule(title: str = ""):
width = 60
if title:
pad = max(0, width - len(title) - 2)
left = pad // 2
right = pad - left
line = f"{'-' * left} {title} {'-' * right}"
else:
line = "-" * width
print(f"\n {_c(C.YEL, line)}\n")
def draw_table(headers: list, rows: list, col_widths: list):
sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+"
def fmt_row(cells):
return " | " + " | ".join(
str(c).ljust(w) for c, w in zip(cells, col_widths)
) + " |"
print(_c(C.DIM, sep))
print(_c(C.BOLD, fmt_row(headers)))
print(_c(C.DIM, sep))
for row in rows:
print(fmt_row(row))
print(_c(C.DIM, sep))
def draw_box(lines: list, colour: str = C.CYN):
width = max(len(l) for l in lines) + 4
print(f" {_c(colour, '+' + '-' * width + '+')}")
for line in lines:
pad = width - len(line) - 2
print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}")
print(f" {_c(colour, '+' + '-' * width + '+')}")
# ── Input helpers ─────────────────────────────────────────────────────────────
def prompt(label: str, default: str = "", password: bool = False) -> str:
display = f" {_c(C.CYN, label)}"
if default:
display += f" {_c(C.DIM, f'[{default}]')}"
display += ": "
if password:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
sys.stdout.write(display)
sys.stdout.flush()
chars = []
while True:
ch = sys.stdin.read(1)
if ch in ("\r", "\n"):
break
elif ch in ("\x7f", "\x08"):
if chars:
chars.pop()
sys.stdout.write("\b \b")
sys.stdout.flush()
elif ch == "\x03":
raise KeyboardInterrupt
else:
chars.append(ch)
sys.stdout.write("*")
sys.stdout.flush()
print()
return "".join(chars)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
else:
sys.stdout.write(display)
sys.stdout.flush()
val = sys.stdin.readline().strip()
return val if val else default
def prompt_ip(label: str) -> str:
while True:
val = prompt(label)
try:
ipaddress.IPv4Address(val)
return val
except ValueError:
warn(f"'{val}' is not a valid IPv4 address — please try again.")
def prompt_yn(label: str, default: bool = True) -> bool:
hint = "Y/n" if default else "y/N"
val = prompt(f"{label} [{hint}]").strip().lower()
if not val:
return default
return val in ("y", "yes")
def prompt_password() -> str:
while True:
val = prompt(
"Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)",
password=True,
)
if val:
return val
warn("Password cannot be empty.")
# ── Data classes ──────────────────────────────────────────────────────────────
@dataclass
class IOMConfig:
iom: str
dhcp: bool = True
ip: str = ""
gateway: str = ""
netmask: str = ""
@dataclass
class ShelfConfig:
device: str = ""
password: str = ""
iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1"))
iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2"))
# ── Serial port (stdlib: termios / fcntl / select) ────────────────────────────
class SerialPort:
"""
Minimal 8N1 serial port using only the Python standard library.
Replaces pyserial for TrueNAS environments where pip is unavailable.
"""
BAUD_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._fd: Optional[int] = None
self._saved_attrs = None
# ── Open / close ──────────────────────────────────────────────────────────
def open(self):
try:
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as e:
raise OSError(f"Cannot open {self.port}: {e}") from e
# Switch back to blocking mode now that O_NOCTTY is set
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
self._saved_attrs = termios.tcgetattr(self._fd)
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
attrs = list(termios.tcgetattr(self._fd))
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
attrs[0] = termios.IGNBRK # iflag
attrs[1] = 0 # oflag
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
attrs[3] = 0 # lflag (raw)
attrs[4] = baud # ispeed
attrs[5] = baud # ospeed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
termios.tcflush(self._fd, termios.TCIOFLUSH)
def close(self):
if self._fd is not None:
try:
if self._saved_attrs:
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
os.close(self._fd)
except OSError:
pass
finally:
self._fd = None
@property
def is_open(self) -> bool:
return self._fd is not None
# ── Read / write ──────────────────────────────────────────────────────────
def write(self, data: bytes):
if self._fd is None:
raise OSError("Port is not open")
os.write(self._fd, data)
def read_chunk(self, size: int = 4096) -> bytes:
if self._fd is None:
raise OSError("Port is not open")
try:
return os.read(self._fd, size)
except OSError:
return b""
def read_until_quiet(self, quiet_period: float = 0.5) -> str:
"""
Read until no new bytes arrive for `quiet_period` seconds,
or until `self.timeout` total seconds have elapsed.
"""
output = b""
deadline = time.monotonic() + self.timeout
last_rx = time.monotonic()
while True:
now = time.monotonic()
if now >= deadline:
break
if output and (now - last_rx) >= quiet_period:
break
wait = min(deadline - now, quiet_period)
ready, _, _ = select.select([self._fd], [], [], wait)
if ready:
chunk = self.read_chunk()
if chunk:
output += chunk
last_rx = time.monotonic()
return output.decode("utf-8", errors="replace")
def send_line(self, cmd: str = "", delay: float = 0.3):
self.write((cmd + "\r\n").encode("utf-8"))
time.sleep(delay)
# ── Context manager ───────────────────────────────────────────────────────
def __enter__(self):
self.open()
return self
def __exit__(self, *_):
self.close()
# ── Step 1: Detect serial device ──────────────────────────────────────────────
def detect_serial_device() -> Optional[str]:
rule("Step 1 of 5 -- Serial Cable & Device Detection")
print(" Connect the serial cable from the ES24N IOM1 port")
print(" to the active F-Series controller USB port.")
print()
prompt("Press Enter when the cable is connected")
for attempt in range(1, 4):
info(f"Scanning for USB serial devices (attempt {attempt}/3)...")
time.sleep(1)
# FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM*
patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"]
ports = sorted({p for pat in patterns for p in glob.glob(pat)})
if ports:
break
if attempt < 3:
warn("No device found yet — retrying in 2 seconds...")
time.sleep(2)
if not ports:
error("No USB serial device detected after 3 attempts.")
print()
print(" Troubleshooting:")
print(" - Ensure the serial cable is fully seated at both ends.")
print(" - Try a different USB port on the controller.")
print(" - Confirm the ES24N is powered on.")
return None
if len(ports) == 1:
ok(f"Device found: {_c(C.BOLD, ports[0])}")
_fix_permissions(ports[0])
return ports[0]
# Multiple devices — let the user choose
print()
draw_table(
["#", "Device"],
[[str(i), p] for i, p in enumerate(ports, 1)],
[4, 24],
)
print()
while True:
val = prompt(f"Select device number [1-{len(ports)}]")
if val.isdigit() and 1 <= int(val) <= len(ports):
selected = ports[int(val) - 1]
ok(f"Selected: {_c(C.BOLD, selected)}")
_fix_permissions(selected)
return selected
warn(f"Please enter a number between 1 and {len(ports)}.")
def _fix_permissions(device: str):
try:
result = subprocess.run(
["sudo", "chown", ":wheel", device],
capture_output=True, timeout=5,
)
if result.returncode == 0:
ok(f"Permissions updated on {device}")
return
except Exception:
pass
try:
os.chmod(device, 0o666)
ok(f"Permissions updated on {device}")
except PermissionError:
warn("Could not update device permissions automatically.")
warn("If the connection fails, re-run this script with sudo.")
# ── Step 2: Open serial connection & wake IOM console ─────────────────────────
def open_serial_connection(device: str) -> Optional[SerialPort]:
rule("Step 2 of 5 -- Opening Serial Connection")
info(f"Opening {device} at 115200 baud (8N1)...")
ser = SerialPort(device, baudrate=115200, timeout=5.0)
try:
ser.open()
except OSError as e:
error(str(e))
return None
ok(f"Port opened: {device}")
info("Sending wake signal to IOM console...")
ser.send_line("", delay=0.5)
ser.send_line("", delay=0.5)
response = ser.read_until_quiet(quiet_period=0.5)
print()
if response.strip():
print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}")
for line in response.strip().splitlines():
print(f" {_c(C.DIM, '|')} {line}")
print(f" {_c(C.DIM, '+' + '-' * 56)}")
print()
low = response.lower()
if any(kw in low for kw in ("login", "$", "#", "password")):
ok("IOM console is responsive.")
else:
warn("Unexpected response — the IOM may still be booting.")
warn("You can continue; the Redfish API operates independently.")
else:
warn("No response received from IOM console.")
warn("The Redfish API may still be reachable. Continuing...")
print()
return ser
# ── Redfish helpers (GET and PATCH) ──────────────────────────────────────────
def _redfish_request(password: str, method: str, path: str,
payload: Optional[dict] = None) -> tuple:
"""
Issue a Redfish HTTP request over the loopback (127.0.0.1).
Returns (success: bool, data: dict|str).
"""
url = f"https://127.0.0.1{path}"
credentials = b64encode(f"Admin:{password}".encode()).decode()
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
body = json.dumps(payload).encode("utf-8") if payload else None
headers = {"Authorization": f"Basic {credentials}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, method=method, headers=headers)
try:
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
return True, json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
return True, {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(raw).get("error", {}).get("message", raw)
except json.JSONDecodeError:
msg = raw
return False, f"HTTP {e.code}: {msg[:120]}"
except OSError as e:
return False, f"Connection error: {e}"
# ── Step 3: Fetch & display current IOM network settings ─────────────────────
def fetch_current_config(cfg: ShelfConfig) -> bool:
"""
Query Redfish for the current network config of both IOMs.
Populates cfg.iom1 / cfg.iom2 with live data.
Returns True if at least one IOM responded.
"""
rule("Step 3 of 5 -- Current IOM Network Settings")
info("Querying Redfish API for current network configuration...")
print()
any_ok = False
rows = []
for iom in ("IOM1", "IOM2"):
path = f"/redfish/v1/Managers/{iom}/EthernetInterfaces/1"
ok_flag, data = _redfish_request(cfg.password, "GET", path)
if ok_flag and isinstance(data, dict):
any_ok = True
# Determine mode
dhcp_enabled = (
data.get("DHCPv4", {}).get("DHCPEnabled", False) or
data.get("DHCPv6", {}).get("DHCPEnabled", False)
)
# Pull address info — prefer StaticAddresses, fall back to IPv4Addresses
addrs = data.get("IPv4StaticAddresses") or data.get("IPv4Addresses", [])
if addrs:
addr_rec = addrs[0]
ip = addr_rec.get("Address", "--")
gateway = addr_rec.get("Gateway", "--")
netmask = addr_rec.get("SubnetMask", "--")
else:
ip = gateway = netmask = "--"
origin = data.get("IPv4Addresses", [{}])[0].get("AddressOrigin", "Unknown") \
if data.get("IPv4Addresses") else ("DHCP" if dhcp_enabled else "Static")
iom_cfg = IOMConfig(
iom = iom,
dhcp = dhcp_enabled,
ip = ip if ip != "--" else "",
gateway = gateway if gateway != "--" else "",
netmask = netmask if netmask != "--" else "",
)
if iom == "IOM1":
cfg.iom1 = iom_cfg
else:
cfg.iom2 = iom_cfg
mode_str = f"{_c(C.CYN, 'DHCP')}" if dhcp_enabled else f"{_c(C.GRN, 'Static')}"
rows.append([iom, mode_str, origin, ip, gateway, netmask])
else:
rows.append([iom, _c(C.RED, "No response"), "--", "--", "--", "--"])
draw_table(
["IOM", "Mode", "Origin", "IP Address", "Gateway", "Subnet Mask"],
rows,
[5, 10, 8, 16, 16, 16],
)
print()
if not any_ok:
error("Neither IOM responded to the Redfish query.")
error("Check that the serial cable is connected and the IOM is booted.")
return any_ok
# ── Step 4: Prompt user — change config or exit ───────────────────────────────
def collect_network_config(cfg: ShelfConfig) -> bool:
"""
Show current settings, ask user what to do.
Returns True to proceed with applying changes, False to skip.
"""
rule("Step 4 of 5 -- Change Configuration?")
print(f" {_c(C.BOLD, '1')} Change network configuration")
print(f" {_c(C.BOLD, '2')} Leave settings as-is and disconnect")
print()
while True:
choice = prompt("Select option [1/2]")
if choice in ("1", "2"):
break
warn("Please enter 1 or 2.")
if choice == "2":
info("No changes requested.")
return False
# ── User wants to change settings ─────────────────────────────────────────
print()
print(" How should the IOMs be configured?")
print(f" {_c(C.BOLD, '1')} Static IP addresses")
print(f" {_c(C.BOLD, '2')} DHCP")
print()
while True:
mode = prompt("Select mode [1/2]")
if mode in ("1", "2"):
break
warn("Please enter 1 or 2.")
use_dhcp = (mode == "2")
print()
if use_dhcp:
cfg.iom1 = IOMConfig("IOM1", dhcp=True)
cfg.iom2 = IOMConfig("IOM2", dhcp=True)
ok("Both IOMs will be set to DHCP.")
return True
# Static — IOM1
info(f"Static network details for {_c(C.BOLD, 'IOM1')}:")
iom1_ip = prompt_ip(" IOM1 IP address ")
iom1_gw = prompt_ip(" IOM1 Gateway ")
iom1_nm = prompt_ip(" IOM1 Subnet Mask")
cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm)
# Static — IOM2
print()
info(f"Static network details for {_c(C.BOLD, 'IOM2')}:")
iom2_ip = prompt_ip(" IOM2 IP address ")
same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True)
if same:
iom2_gw, iom2_nm = iom1_gw, iom1_nm
else:
iom2_gw = prompt_ip(" IOM2 Gateway ")
iom2_nm = prompt_ip(" IOM2 Subnet Mask")
cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm)
return True
# ── Step 5a: Apply configuration via Redfish ──────────────────────────────────
def apply_configuration(cfg: ShelfConfig) -> bool:
rule("Step 5 of 5 -- Applying Configuration via Redfish API")
info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...")
print()
results = []
all_ok = True
for iom_cfg in [cfg.iom1, cfg.iom2]:
if iom_cfg.dhcp:
payload = {"DHCPv4": {"DHCPEnabled": True}}
else:
payload = {
"DHCPv4": {"DHCPEnabled": False},
"IPv4StaticAddresses": [{
"Address": iom_cfg.ip,
"Gateway": iom_cfg.gateway,
"SubnetMask": iom_cfg.netmask,
}],
}
path = f"/redfish/v1/Managers/{iom_cfg.iom}/EthernetInterfaces/1"
success, data = _redfish_request(cfg.password, "PATCH", path, payload)
if success:
mode = "DHCP" if iom_cfg.dhcp else f"Static {iom_cfg.ip}"
results.append([iom_cfg.iom, _c(C.GRN, "OK"), f"Configured: {mode}"])
else:
results.append([iom_cfg.iom, _c(C.RED, "FAIL"), str(data)[:60]])
all_ok = False
draw_table(["IOM", "Result", "Detail"], results, [6, 8, 50])
print()
return all_ok
# ── Step 5b: Print applied-settings summary ───────────────────────────────────
def print_summary(cfg: ShelfConfig, changed: bool):
rule("Summary")
def val(iom: IOMConfig, field: str) -> str:
dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"}
stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask}
return (dhcp_map if iom.dhcp else stat_map).get(field, "")
draw_table(
["Setting", "IOM1", "IOM2"],
[
["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")],
["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")],
["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")],
["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")],
["Serial Port", cfg.device, cfg.device],
["Changes", "Yes" if changed else "None", ""],
],
[12, 22, 22],
)
if changed:
print()
draw_box([
f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}",
"",
"Remove the serial cable ONLY after verifying each",
"expander appears in TrueNAS with matching drives.",
"",
"TrueNAS > System Settings > Enclosure >",
"NVMe-oF Expansion Shelves",
], colour=C.YEL)
print()
# ── Disconnect ────────────────────────────────────────────────────────────────
def close_serial_connection(ser: SerialPort, device: str):
if ser and ser.is_open:
ser.close()
ok(f"Serial port {device} closed.")
print()
prompt("Disconnect the serial cable, then press Enter to continue")
ok("Serial cable disconnected. Shelf complete.")
# ── Full shelf configuration cycle ────────────────────────────────────────────
def configure_shelf() -> bool:
"""Run one complete shelf cycle. Returns True if user wants another shelf."""
banner()
cfg = ShelfConfig()
# 1 — Detect device
device = detect_serial_device()
if not device:
error("Could not detect a serial device. Returning to main menu.")
time.sleep(2)
return True
cfg.device = device
# 2 — Open serial port & wake IOM console
ser = open_serial_connection(device)
if not ser:
error("Could not open serial port. Returning to main menu.")
time.sleep(2)
return True
# Password needed before any Redfish calls
print()
cfg.password = prompt_password()
# 3 — Fetch & display current settings
fetch_current_config(cfg)
# 4 — Ask user: change or leave alone?
apply_changes = collect_network_config(cfg)
# 5 — Apply if requested
changed = False
if apply_changes:
print()
rule("Ready to Apply")
info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1")
print()
if prompt_yn("Apply configuration now?", default=True):
apply_configuration(cfg)
changed = True
else:
warn("Configuration skipped — no changes were made.")
# Summary
print_summary(cfg, changed)
# Disconnect
close_serial_connection(ser, device)
print()
return prompt_yn("Configure another ES24N shelf?", default=False)
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
banner()
print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration over a direct serial')}")
print(f" {_c(C.DIM, 'connection using the Redfish API (loopback).')}")
print(f" {_c(C.DIM, 'No external dependencies -- Python 3 standard library only.')}")
print()
while True:
banner()
draw_box([
f" {_c(C.BOLD, '1')} Configure a new ES24N shelf",
f" {_c(C.BOLD, '2')} Exit",
])
print()
choice = prompt("Select [1/2]")
if choice == "1":
another = configure_shelf()
if not another:
break
elif choice == "2":
break
else:
warn("Please enter 1 or 2.")
time.sleep(1)
print()
ok("Exiting ES24N IOM Configuration Tool. Goodbye.")
print()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
warn("Interrupted. Exiting.")
sys.exit(0)