Upload files to "/"
This commit is contained in:
673
es24n_conf.py
Normal file
673
es24n_conf.py
Normal file
@@ -0,0 +1,673 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ES24N IOM Network Configuration Tool
|
||||
TrueNAS ES24N Expansion Shelf — Serial Configuration Utility
|
||||
Based on ES24N Product Service Guide v.26011
|
||||
|
||||
Zero external dependencies — Python 3 standard library only.
|
||||
Compatible with TrueNAS (FreeBSD) and Linux.
|
||||
"""
|
||||
|
||||
import fcntl
|
||||
import glob
|
||||
import ipaddress
|
||||
import json
|
||||
import os
|
||||
import select
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
import tty
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from base64 import b64encode
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
# ── ANSI colour helpers ───────────────────────────────────────────────────────
|
||||
class C:
|
||||
RED = "\033[0;31m"
|
||||
GRN = "\033[0;32m"
|
||||
YEL = "\033[1;33m"
|
||||
CYN = "\033[0;36m"
|
||||
WHT = "\033[1;37m"
|
||||
DIM = "\033[2m"
|
||||
BOLD = "\033[1m"
|
||||
RESET = "\033[0m"
|
||||
CLEAR = "\033[2J\033[H"
|
||||
|
||||
def _c(colour: str, text: str) -> str:
|
||||
return f"{colour}{text}{C.RESET}"
|
||||
|
||||
def info(msg: str): print(f" {_c(C.CYN, 'i')} {msg}")
|
||||
def ok(msg: str): print(f" {_c(C.GRN, 'OK')} {msg}")
|
||||
def warn(msg: str): print(f" {_c(C.YEL, '!')} {msg}")
|
||||
def error(msg: str): print(f" {_c(C.RED, 'X')} {msg}")
|
||||
|
||||
def banner():
|
||||
print(C.CLEAR, end="")
|
||||
w = 60
|
||||
print(_c(C.CYN, " +" + "-" * w + "+"))
|
||||
print(_c(C.CYN, " |") + _c(C.WHT + C.BOLD,
|
||||
" TrueNAS ES24N IOM Configuration Tool ") + _c(C.CYN, " |"))
|
||||
print(_c(C.CYN, " |") + _c(C.DIM,
|
||||
" Serial Network Setup v2.0 (stdlib only) ") + _c(C.CYN, " |"))
|
||||
print(_c(C.CYN, " +" + "-" * w + "+"))
|
||||
print()
|
||||
|
||||
def rule(title: str = ""):
|
||||
width = 60
|
||||
if title:
|
||||
pad = max(0, width - len(title) - 2)
|
||||
left = pad // 2
|
||||
right = pad - left
|
||||
line = f"{'-' * left} {title} {'-' * right}"
|
||||
else:
|
||||
line = "-" * width
|
||||
print(f"\n {_c(C.YEL, line)}\n")
|
||||
|
||||
def draw_table(headers: list, rows: list, col_widths: list):
|
||||
sep = " +-" + "-+-".join("-" * w for w in col_widths) + "-+"
|
||||
|
||||
def fmt_row(cells):
|
||||
return " | " + " | ".join(
|
||||
str(c).ljust(w) for c, w in zip(cells, col_widths)
|
||||
) + " |"
|
||||
|
||||
print(_c(C.DIM, sep))
|
||||
print(_c(C.BOLD, fmt_row(headers)))
|
||||
print(_c(C.DIM, sep))
|
||||
for row in rows:
|
||||
print(fmt_row(row))
|
||||
print(_c(C.DIM, sep))
|
||||
|
||||
def draw_box(lines: list, colour: str = C.CYN):
|
||||
width = max(len(l) for l in lines) + 4
|
||||
print(f" {_c(colour, '+' + '-' * width + '+')}")
|
||||
for line in lines:
|
||||
pad = width - len(line) - 2
|
||||
print(f" {_c(colour, '|')} {line}{' ' * pad} {_c(colour, '|')}")
|
||||
print(f" {_c(colour, '+' + '-' * width + '+')}")
|
||||
|
||||
|
||||
# ── Input helpers ─────────────────────────────────────────────────────────────
|
||||
def prompt(label: str, default: str = "", password: bool = False) -> str:
|
||||
display = f" {_c(C.CYN, label)}"
|
||||
if default:
|
||||
display += f" {_c(C.DIM, f'[{default}]')}"
|
||||
display += ": "
|
||||
|
||||
if password:
|
||||
fd = sys.stdin.fileno()
|
||||
old = termios.tcgetattr(fd)
|
||||
try:
|
||||
tty.setraw(fd)
|
||||
sys.stdout.write(display)
|
||||
sys.stdout.flush()
|
||||
chars = []
|
||||
while True:
|
||||
ch = sys.stdin.read(1)
|
||||
if ch in ("\r", "\n"):
|
||||
break
|
||||
elif ch in ("\x7f", "\x08"):
|
||||
if chars:
|
||||
chars.pop()
|
||||
sys.stdout.write("\b \b")
|
||||
sys.stdout.flush()
|
||||
elif ch == "\x03":
|
||||
raise KeyboardInterrupt
|
||||
else:
|
||||
chars.append(ch)
|
||||
sys.stdout.write("*")
|
||||
sys.stdout.flush()
|
||||
print()
|
||||
return "".join(chars)
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||||
else:
|
||||
sys.stdout.write(display)
|
||||
sys.stdout.flush()
|
||||
val = sys.stdin.readline().strip()
|
||||
return val if val else default
|
||||
|
||||
|
||||
def prompt_ip(label: str) -> str:
|
||||
while True:
|
||||
val = prompt(label)
|
||||
try:
|
||||
ipaddress.IPv4Address(val)
|
||||
return val
|
||||
except ValueError:
|
||||
warn(f"'{val}' is not a valid IPv4 address — please try again.")
|
||||
|
||||
|
||||
def prompt_yn(label: str, default: bool = True) -> bool:
|
||||
hint = "Y/n" if default else "y/N"
|
||||
val = prompt(f"{label} [{hint}]").strip().lower()
|
||||
if not val:
|
||||
return default
|
||||
return val in ("y", "yes")
|
||||
|
||||
|
||||
def prompt_password() -> str:
|
||||
while True:
|
||||
val = prompt(
|
||||
"Admin password (BMC/chassis serial, e.g. MXE3000043CHA007)",
|
||||
password=True,
|
||||
)
|
||||
if val:
|
||||
return val
|
||||
warn("Password cannot be empty.")
|
||||
|
||||
|
||||
# ── Data classes ──────────────────────────────────────────────────────────────
|
||||
@dataclass
|
||||
class IOMConfig:
|
||||
iom: str
|
||||
dhcp: bool = True
|
||||
ip: str = ""
|
||||
gateway: str = ""
|
||||
netmask: str = ""
|
||||
|
||||
@dataclass
|
||||
class ShelfConfig:
|
||||
device: str = ""
|
||||
password: str = ""
|
||||
iom1: IOMConfig = field(default_factory=lambda: IOMConfig("IOM1"))
|
||||
iom2: IOMConfig = field(default_factory=lambda: IOMConfig("IOM2"))
|
||||
|
||||
|
||||
# ── Serial port (stdlib: termios / fcntl / select) ────────────────────────────
|
||||
class SerialPort:
|
||||
"""
|
||||
Minimal 8N1 serial port using only the Python standard library.
|
||||
Replaces pyserial for TrueNAS environments where pip is unavailable.
|
||||
"""
|
||||
|
||||
BAUD_MAP = {
|
||||
9600: termios.B9600,
|
||||
19200: termios.B19200,
|
||||
38400: termios.B38400,
|
||||
57600: termios.B57600,
|
||||
115200: termios.B115200,
|
||||
}
|
||||
|
||||
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 5.0):
|
||||
self.port = port
|
||||
self.baudrate = baudrate
|
||||
self.timeout = timeout
|
||||
self._fd: Optional[int] = None
|
||||
self._saved_attrs = None
|
||||
|
||||
# ── Open / close ──────────────────────────────────────────────────────────
|
||||
def open(self):
|
||||
try:
|
||||
self._fd = os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
|
||||
except OSError as e:
|
||||
raise OSError(f"Cannot open {self.port}: {e}") from e
|
||||
|
||||
# Switch back to blocking mode now that O_NOCTTY is set
|
||||
flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(self._fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
|
||||
|
||||
self._saved_attrs = termios.tcgetattr(self._fd)
|
||||
|
||||
# Raw 8N1: iflag, oflag, cflag, lflag, ispeed, ospeed, cc
|
||||
attrs = list(termios.tcgetattr(self._fd))
|
||||
baud = self.BAUD_MAP.get(self.baudrate, termios.B115200)
|
||||
|
||||
attrs[0] = termios.IGNBRK # iflag
|
||||
attrs[1] = 0 # oflag
|
||||
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL # cflag
|
||||
attrs[3] = 0 # lflag (raw)
|
||||
attrs[4] = baud # ispeed
|
||||
attrs[5] = baud # ospeed
|
||||
attrs[6][termios.VMIN] = 0
|
||||
attrs[6][termios.VTIME] = min(int(self.timeout * 10), 255)
|
||||
|
||||
termios.tcsetattr(self._fd, termios.TCSANOW, attrs)
|
||||
termios.tcflush(self._fd, termios.TCIOFLUSH)
|
||||
|
||||
def close(self):
|
||||
if self._fd is not None:
|
||||
try:
|
||||
if self._saved_attrs:
|
||||
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._saved_attrs)
|
||||
os.close(self._fd)
|
||||
except OSError:
|
||||
pass
|
||||
finally:
|
||||
self._fd = None
|
||||
|
||||
@property
|
||||
def is_open(self) -> bool:
|
||||
return self._fd is not None
|
||||
|
||||
# ── Read / write ──────────────────────────────────────────────────────────
|
||||
def write(self, data: bytes):
|
||||
if self._fd is None:
|
||||
raise OSError("Port is not open")
|
||||
os.write(self._fd, data)
|
||||
|
||||
def read_chunk(self, size: int = 4096) -> bytes:
|
||||
if self._fd is None:
|
||||
raise OSError("Port is not open")
|
||||
try:
|
||||
return os.read(self._fd, size)
|
||||
except OSError:
|
||||
return b""
|
||||
|
||||
def read_until_quiet(self, quiet_period: float = 0.5) -> str:
|
||||
"""
|
||||
Read until no new bytes arrive for `quiet_period` seconds,
|
||||
or until `self.timeout` total seconds have elapsed.
|
||||
"""
|
||||
output = b""
|
||||
deadline = time.monotonic() + self.timeout
|
||||
last_rx = time.monotonic()
|
||||
|
||||
while True:
|
||||
now = time.monotonic()
|
||||
if now >= deadline:
|
||||
break
|
||||
if output and (now - last_rx) >= quiet_period:
|
||||
break
|
||||
|
||||
wait = min(deadline - now, quiet_period)
|
||||
ready, _, _ = select.select([self._fd], [], [], wait)
|
||||
if ready:
|
||||
chunk = self.read_chunk()
|
||||
if chunk:
|
||||
output += chunk
|
||||
last_rx = time.monotonic()
|
||||
|
||||
return output.decode("utf-8", errors="replace")
|
||||
|
||||
def send_line(self, cmd: str = "", delay: float = 0.3):
|
||||
self.write((cmd + "\r\n").encode("utf-8"))
|
||||
time.sleep(delay)
|
||||
|
||||
# ── Context manager ───────────────────────────────────────────────────────
|
||||
def __enter__(self):
|
||||
self.open()
|
||||
return self
|
||||
|
||||
def __exit__(self, *_):
|
||||
self.close()
|
||||
|
||||
|
||||
# ── Step 1: Detect serial device ──────────────────────────────────────────────
|
||||
def detect_serial_device() -> Optional[str]:
|
||||
rule("Step 1 of 6 -- Serial Cable & Device Detection")
|
||||
|
||||
print(" Connect the serial cable from the ES24N IOM1 port")
|
||||
print(" to the active F-Series controller USB port.")
|
||||
print()
|
||||
prompt("Press Enter when the cable is connected")
|
||||
|
||||
for attempt in range(1, 4):
|
||||
info(f"Scanning for USB serial devices (attempt {attempt}/3)...")
|
||||
time.sleep(1)
|
||||
|
||||
# FreeBSD: /dev/ttyU* Linux: /dev/ttyUSB*, /dev/ttyACM*
|
||||
patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyU*"]
|
||||
ports = sorted({p for pat in patterns for p in glob.glob(pat)})
|
||||
|
||||
if ports:
|
||||
break
|
||||
|
||||
if attempt < 3:
|
||||
warn("No device found yet — retrying in 2 seconds...")
|
||||
time.sleep(2)
|
||||
|
||||
if not ports:
|
||||
error("No USB serial device detected after 3 attempts.")
|
||||
print()
|
||||
print(" Troubleshooting:")
|
||||
print(" - Ensure the serial cable is fully seated at both ends.")
|
||||
print(" - Try a different USB port on the controller.")
|
||||
print(" - Confirm the ES24N is powered on.")
|
||||
return None
|
||||
|
||||
if len(ports) == 1:
|
||||
ok(f"Device found: {_c(C.BOLD, ports[0])}")
|
||||
_fix_permissions(ports[0])
|
||||
return ports[0]
|
||||
|
||||
# Multiple devices — let the user choose
|
||||
print()
|
||||
draw_table(
|
||||
["#", "Device"],
|
||||
[[str(i), p] for i, p in enumerate(ports, 1)],
|
||||
[4, 24],
|
||||
)
|
||||
print()
|
||||
|
||||
while True:
|
||||
val = prompt(f"Select device number [1-{len(ports)}]")
|
||||
if val.isdigit() and 1 <= int(val) <= len(ports):
|
||||
selected = ports[int(val) - 1]
|
||||
ok(f"Selected: {_c(C.BOLD, selected)}")
|
||||
_fix_permissions(selected)
|
||||
return selected
|
||||
warn(f"Please enter a number between 1 and {len(ports)}.")
|
||||
|
||||
|
||||
def _fix_permissions(device: str):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["sudo", "chown", ":wheel", device],
|
||||
capture_output=True, timeout=5,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
ok(f"Permissions updated on {device}")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
os.chmod(device, 0o666)
|
||||
ok(f"Permissions updated on {device}")
|
||||
except PermissionError:
|
||||
warn("Could not update device permissions automatically.")
|
||||
warn("If the connection fails, re-run this script with sudo.")
|
||||
|
||||
|
||||
# ── Step 2: Open serial connection & wake IOM console ─────────────────────────
|
||||
def open_serial_connection(device: str) -> Optional[SerialPort]:
|
||||
rule("Step 2 of 6 -- Opening Serial Connection")
|
||||
|
||||
info(f"Opening {device} at 115200 baud (8N1)...")
|
||||
ser = SerialPort(device, baudrate=115200, timeout=5.0)
|
||||
|
||||
try:
|
||||
ser.open()
|
||||
except OSError as e:
|
||||
error(str(e))
|
||||
return None
|
||||
|
||||
ok(f"Port opened: {device}")
|
||||
info("Sending wake signal to IOM console...")
|
||||
|
||||
ser.send_line("", delay=0.5)
|
||||
ser.send_line("", delay=0.5)
|
||||
response = ser.read_until_quiet(quiet_period=0.5)
|
||||
|
||||
print()
|
||||
if response.strip():
|
||||
print(f" {_c(C.DIM, '+-- IOM Console Response ' + '-' * 31)}")
|
||||
for line in response.strip().splitlines():
|
||||
print(f" {_c(C.DIM, '|')} {line}")
|
||||
print(f" {_c(C.DIM, '+' + '-' * 56)}")
|
||||
print()
|
||||
|
||||
low = response.lower()
|
||||
if any(kw in low for kw in ("login", "$", "#", "password")):
|
||||
ok("IOM console is responsive.")
|
||||
else:
|
||||
warn("Unexpected response — the IOM may still be booting.")
|
||||
warn("You can continue; the Redfish API operates independently.")
|
||||
else:
|
||||
warn("No response received from IOM console.")
|
||||
warn("The Redfish API may still be reachable. Continuing...")
|
||||
|
||||
print()
|
||||
return ser
|
||||
|
||||
|
||||
# ── Step 3: Collect network configuration ─────────────────────────────────────
|
||||
def collect_network_config(cfg: ShelfConfig):
|
||||
rule("Step 3 of 6 -- IOM Network Configuration")
|
||||
|
||||
print(" How should the IOMs be configured?")
|
||||
print(f" {_c(C.BOLD, '1')} Static IP addresses")
|
||||
print(f" {_c(C.BOLD, '2')} DHCP")
|
||||
print()
|
||||
|
||||
while True:
|
||||
choice = prompt("Select option [1/2]")
|
||||
if choice in ("1", "2"):
|
||||
break
|
||||
warn("Please enter 1 or 2.")
|
||||
|
||||
use_dhcp = (choice == "2")
|
||||
print()
|
||||
|
||||
cfg.password = prompt_password()
|
||||
|
||||
if use_dhcp:
|
||||
cfg.iom1 = IOMConfig("IOM1", dhcp=True)
|
||||
cfg.iom2 = IOMConfig("IOM2", dhcp=True)
|
||||
print()
|
||||
ok("Both IOMs will be set to DHCP.")
|
||||
return
|
||||
|
||||
# Static — IOM1
|
||||
print()
|
||||
info(f"Static network details for {_c(C.BOLD, 'IOM1')}:")
|
||||
iom1_ip = prompt_ip(" IOM1 IP address ")
|
||||
iom1_gw = prompt_ip(" IOM1 Gateway ")
|
||||
iom1_nm = prompt_ip(" IOM1 Subnet Mask")
|
||||
cfg.iom1 = IOMConfig("IOM1", dhcp=False, ip=iom1_ip, gateway=iom1_gw, netmask=iom1_nm)
|
||||
|
||||
# Static — IOM2
|
||||
print()
|
||||
info(f"Static network details for {_c(C.BOLD, 'IOM2')}:")
|
||||
iom2_ip = prompt_ip(" IOM2 IP address ")
|
||||
|
||||
same = prompt_yn(" Same gateway and subnet mask as IOM1?", default=True)
|
||||
if same:
|
||||
iom2_gw, iom2_nm = iom1_gw, iom1_nm
|
||||
else:
|
||||
iom2_gw = prompt_ip(" IOM2 Gateway ")
|
||||
iom2_nm = prompt_ip(" IOM2 Subnet Mask")
|
||||
|
||||
cfg.iom2 = IOMConfig("IOM2", dhcp=False, ip=iom2_ip, gateway=iom2_gw, netmask=iom2_nm)
|
||||
|
||||
|
||||
# ── Step 4: Apply configuration via Redfish ───────────────────────────────────
|
||||
def apply_configuration(cfg: ShelfConfig) -> bool:
|
||||
rule("Step 4 of 6 -- Applying Configuration via Redfish API")
|
||||
|
||||
info("Sending Redfish PATCH requests over serial loopback (127.0.0.1)...")
|
||||
print()
|
||||
|
||||
results = []
|
||||
all_ok = True
|
||||
for iom_cfg in [cfg.iom1, cfg.iom2]:
|
||||
success, message = _patch_iom(cfg.password, iom_cfg)
|
||||
results.append([iom_cfg.iom,
|
||||
f"{_c(C.GRN, 'OK')}" if success else f"{_c(C.RED, 'FAIL')}",
|
||||
message])
|
||||
if not success:
|
||||
all_ok = False
|
||||
|
||||
draw_table(["IOM", "Result", "Detail"], results, [6, 8, 44])
|
||||
print()
|
||||
return all_ok
|
||||
|
||||
|
||||
def _patch_iom(password: str, iom: IOMConfig) -> tuple:
|
||||
url = f"https://127.0.0.1/redfish/v1/Managers/{iom.iom}/EthernetInterfaces/1"
|
||||
|
||||
if iom.dhcp:
|
||||
payload = {"DHCPv4": {"DHCPEnabled": True}}
|
||||
else:
|
||||
payload = {
|
||||
"DHCPv4": {"DHCPEnabled": False},
|
||||
"IPv4StaticAddresses": [{
|
||||
"Address": iom.ip,
|
||||
"Gateway": iom.gateway,
|
||||
"SubnetMask": iom.netmask,
|
||||
}],
|
||||
}
|
||||
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
credentials = b64encode(f"Admin:{password}".encode()).decode()
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=data,
|
||||
method="PATCH",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Basic {credentials}",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
|
||||
if resp.status in (200, 204):
|
||||
mode = "DHCP" if iom.dhcp else f"static {iom.ip}"
|
||||
return True, f"Configured: {mode}"
|
||||
body = resp.read().decode("utf-8", errors="replace")
|
||||
return False, f"HTTP {resp.status}: {body[:80]}"
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
msg = json.loads(body).get("error", {}).get("message", body)
|
||||
except json.JSONDecodeError:
|
||||
msg = body
|
||||
return False, f"HTTP {e.code}: {msg[:80]}"
|
||||
|
||||
except OSError as e:
|
||||
return False, f"Connection error: {e}"
|
||||
|
||||
|
||||
# ── Step 5: Print configuration summary ───────────────────────────────────────
|
||||
def print_summary(cfg: ShelfConfig):
|
||||
rule("Step 5 of 6 -- Configuration Summary")
|
||||
|
||||
def val(iom: IOMConfig, field: str) -> str:
|
||||
dhcp_map = {"mode": "DHCP", "ip": "-- (DHCP)", "gateway": "-- (DHCP)", "netmask": "-- (DHCP)"}
|
||||
stat_map = {"mode": "Static", "ip": iom.ip, "gateway": iom.gateway, "netmask": iom.netmask}
|
||||
return (dhcp_map if iom.dhcp else stat_map).get(field, "")
|
||||
|
||||
draw_table(
|
||||
["Setting", "IOM1", "IOM2"],
|
||||
[
|
||||
["Mode", val(cfg.iom1, "mode"), val(cfg.iom2, "mode")],
|
||||
["IP Address", val(cfg.iom1, "ip"), val(cfg.iom2, "ip")],
|
||||
["Gateway", val(cfg.iom1, "gateway"), val(cfg.iom2, "gateway")],
|
||||
["Subnet Mask", val(cfg.iom1, "netmask"), val(cfg.iom2, "netmask")],
|
||||
["Serial Port", cfg.device, cfg.device],
|
||||
],
|
||||
[12, 22, 22],
|
||||
)
|
||||
|
||||
print()
|
||||
draw_box([
|
||||
f"{_c(C.YEL, 'IMPORTANT -- Per the ES24N Service Guide:')}",
|
||||
"",
|
||||
"Remove the serial cable ONLY after verifying each",
|
||||
"expander appears in TrueNAS with matching drives.",
|
||||
"",
|
||||
"TrueNAS > System Settings > Enclosure >",
|
||||
"NVMe-oF Expansion Shelves",
|
||||
], colour=C.YEL)
|
||||
print()
|
||||
|
||||
|
||||
# ── Step 6: Close serial connection ───────────────────────────────────────────
|
||||
def close_serial_connection(ser: SerialPort, device: str):
|
||||
rule("Step 6 of 6 -- Close Serial Connection")
|
||||
|
||||
if ser and ser.is_open:
|
||||
ser.close()
|
||||
ok(f"Serial port {device} closed.")
|
||||
|
||||
print()
|
||||
prompt("Disconnect the serial cable, then press Enter to continue")
|
||||
ok("Serial cable disconnected. Shelf configuration complete.")
|
||||
|
||||
|
||||
# ── Full shelf configuration cycle ────────────────────────────────────────────
|
||||
def configure_shelf() -> bool:
|
||||
"""Run one complete shelf cycle. Returns True if user wants another."""
|
||||
banner()
|
||||
cfg = ShelfConfig()
|
||||
|
||||
# 1 — Detect device
|
||||
device = detect_serial_device()
|
||||
if not device:
|
||||
error("Could not detect a serial device. Returning to main menu.")
|
||||
time.sleep(2)
|
||||
return True
|
||||
cfg.device = device
|
||||
|
||||
# 2 — Open serial port
|
||||
ser = open_serial_connection(device)
|
||||
if not ser:
|
||||
error("Could not open serial port. Returning to main menu.")
|
||||
time.sleep(2)
|
||||
return True
|
||||
|
||||
# 3 — Collect settings
|
||||
collect_network_config(cfg)
|
||||
|
||||
# 4 — Confirm & apply
|
||||
print()
|
||||
rule("Ready to Apply")
|
||||
info("Redfish PATCH requests will be sent to each IOM via 127.0.0.1")
|
||||
info("using the active serial session as the communication path.")
|
||||
print()
|
||||
|
||||
if prompt_yn("Apply configuration now?", default=True):
|
||||
apply_configuration(cfg)
|
||||
else:
|
||||
warn("Configuration skipped — no changes were made.")
|
||||
|
||||
# 5 — Summary & reminder
|
||||
print_summary(cfg)
|
||||
|
||||
# 6 — Close serial port
|
||||
close_serial_connection(ser, device)
|
||||
|
||||
print()
|
||||
return prompt_yn("Configure another ES24N shelf?", default=False)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
def main():
|
||||
banner()
|
||||
print(f" {_c(C.DIM, 'Automates ES24N IOM network configuration over a direct serial')}")
|
||||
print(f" {_c(C.DIM, 'connection using the Redfish API (loopback).')}")
|
||||
print(f" {_c(C.DIM, 'No external dependencies -- Python 3 standard library only.')}")
|
||||
print()
|
||||
|
||||
while True:
|
||||
banner()
|
||||
draw_box([
|
||||
f" {_c(C.BOLD, '1')} Configure a new ES24N shelf",
|
||||
f" {_c(C.BOLD, '2')} Exit",
|
||||
])
|
||||
print()
|
||||
|
||||
choice = prompt("Select [1/2]")
|
||||
if choice == "1":
|
||||
another = configure_shelf()
|
||||
if not another:
|
||||
break
|
||||
elif choice == "2":
|
||||
break
|
||||
else:
|
||||
warn("Please enter 1 or 2.")
|
||||
time.sleep(1)
|
||||
|
||||
print()
|
||||
ok("Exiting ES24N IOM Configuration Tool. Goodbye.")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print()
|
||||
warn("Interrupted. Exiting.")
|
||||
sys.exit(0)
|
||||
Reference in New Issue
Block a user