Files
es24n-conf/modules/redfish.py
scott 2b55617db2 Associate password with each shelf for multi-shelf firmware updates
Each ES24N shelf has a unique Admin password (its BMC serial number), so a
single shared password is incorrect for multi-shelf runs. The password prompt
now appears once per shelf inside _collect_shelves(), stored as the first
element of each (password, [(iom, ip), ...]) shelf tuple. _make_targets()
threads the password into each (label, iom, ip, password) target entry, and
_show_fw_versions() uses the per-target password instead of a global one.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 08:24:09 -04:00

239 lines
8.1 KiB
Python

"""
redfish.py — Redfish API client functions shared across all ES24N workflows.
Handles GET/PATCH requests, firmware upload, task polling, and version queries.
"""
import json
import os
import ssl
import time
import urllib.error
import urllib.request
from base64 import b64encode
from typing import Optional
from ui import _c, C, info, draw_table
def _redfish_request(password: str, method: str, path: str,
payload: Optional[dict] = None,
host: str = "127.0.0.1") -> tuple:
"""
Issue a Redfish HTTP request. host defaults to the serial loopback (127.0.0.1)
but can be set to an IOM's network IP for firmware update operations.
Returns (success: bool, data: dict|str).
"""
url = f"https://{host}{path}"
username = "root" if host == "127.0.0.1" else "Admin"
credentials = b64encode(f"{username}:{password}".encode()).decode()
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
body = json.dumps(payload).encode("utf-8") if payload else None
headers = {"Authorization": f"Basic {credentials}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, method=method, headers=headers)
try:
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
return True, json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
return True, {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(raw).get("error", {}).get("message", raw)
except json.JSONDecodeError:
msg = raw
return False, f"HTTP {e.code}: {msg[:120]}"
except OSError as e:
return False, f"Connection error: {e}"
def _redfish_upload_firmware(password: str, host: str, fw_path: str) -> tuple:
"""
Upload a firmware file to /redfish/v1/UpdateService using multipart/form-data.
Equivalent to: curl -k -u Admin:<PW> https://<IP>/redfish/v1/UpdateService
-X POST -F "software=@<file>"
"""
try:
with open(fw_path, "rb") as f:
file_data = f.read()
except OSError as e:
return False, f"Cannot read file: {e}"
filename = os.path.basename(fw_path)
boundary = f"FormBoundary{int(time.time() * 1000)}"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="software"; filename="{filename}"\r\n'
"Content-Type: application/octet-stream\r\n"
"\r\n"
).encode() + file_data + f"\r\n--{boundary}--\r\n".encode()
url = f"https://{host}/redfish/v1/UpdateService"
credentials = b64encode(f"Admin:{password}".encode()).decode() # always network
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Authorization": f"Basic {credentials}",
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
)
try:
with urllib.request.urlopen(req, context=ctx, timeout=120) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
return True, json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
return True, {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(raw).get("error", {}).get("message", raw)
except json.JSONDecodeError:
msg = raw
return False, f"HTTP {e.code}: {msg[:120]}"
except OSError as e:
return False, f"Connection error: {e}"
def _redfish_trigger_update(password: str, host: str, target: str) -> tuple:
"""
Trigger a Redfish SimpleUpdate for the given target resource path.
target: e.g. "/redfish/v1/Managers/IOM1"
or "/redfish/v1/Chassis/IOM1/NetworkAdapters/1"
"""
return _redfish_request(
password, "POST",
"/redfish/v1/UpdateService/Actions/SimpleUpdate",
payload={
"ImageURI": "/redfish/v1/UpdateService/software",
"Targets": [target],
},
host=host,
)
def _redfish_poll_tasks(password: str, host: str, timeout: int = 600) -> tuple:
"""
Poll /redfish/v1/TaskService/Tasks/ until all tasks reach a terminal state
or timeout is exceeded. Returns (success: bool, message: str).
"""
TERMINAL = {"Completed", "Killed", "Exception"}
deadline = time.monotonic() + timeout
elapsed = 0
while time.monotonic() < deadline:
ok_flag, data = _redfish_request(
password, "GET", "/redfish/v1/TaskService/Tasks/", host=host,
)
if not ok_flag:
return False, f"Task service error: {data}"
members = data.get("Members", [])
if not members:
return True, "No pending tasks."
running = []
for member in members:
state = member.get("TaskState")
if state is None:
task_path = member.get("@odata.id", "")
if task_path:
t_ok, t_data = _redfish_request(
password, "GET", task_path, host=host,
)
state = (t_data.get("TaskState")
if t_ok and isinstance(t_data, dict) else "Running")
else:
state = "Running"
if state not in TERMINAL:
running.append(state)
if not running:
return True, "All tasks completed."
info(f" Tasks running ({', '.join(running)})... [{elapsed}s elapsed]")
time.sleep(10)
elapsed += 10
return False, f"Timeout after {timeout}s waiting for tasks."
def _redfish_restart_iom(password: str, host: str, iom: str) -> tuple:
return _redfish_request(
password, "POST",
f"/redfish/v1/Managers/{iom}/Actions/Manager.Reset",
payload={"ResetType": "GracefulRestart"},
host=host,
)
def _redfish_reset_fabric(password: str, host: str, iom: str) -> tuple:
return _redfish_request(
password, "POST",
f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1/Actions/NetworkAdapter.Reset",
payload={"ResetType": "GracefulRestart"},
host=host,
)
def _get_iom_fw_version(password: str, host: str, iom: str) -> str:
ok_flag, data = _redfish_request(
password, "GET", f"/redfish/v1/Managers/{iom}", host=host,
)
if ok_flag and isinstance(data, dict):
return data.get("FirmwareVersion", "Unknown")
return _c(C.RED, "Unreachable")
def _get_fabric_fw_version(password: str, host: str, iom: str) -> str:
ok_flag, data = _redfish_request(
password, "GET",
f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1",
host=host,
)
if ok_flag and isinstance(data, dict):
version = (data.get("Oem", {})
.get("Version", {})
.get("ActiveFirmwareVersion"))
return version or "Unknown"
return _c(C.RED, "Unreachable")
def _show_fw_versions(targets: list):
"""
Query and display firmware versions.
targets: list of (label, iom, ip, password) tuples where
label: display string (e.g. "IOM1", "S1 / IOM1")
iom: actual IOM name used in Redfish paths ("IOM1" or "IOM2")
ip: management IP address
password: Admin password for this shelf
"""
info("Querying firmware versions...")
rows = []
for label, iom, ip, password in targets:
iom_ver = _get_iom_fw_version(password, ip, iom)
fabric_ver = _get_fabric_fw_version(password, ip, iom)
rows.append([label, ip, iom_ver, fabric_ver])
print()
draw_table(
["IOM", "IP Address", "IOM Firmware", "Fabric Firmware"],
rows,
[12, 16, 32, 20],
)
print()