Files
es24n-conf/modules/redfish.py
scott 10354794f8 Add multi-shelf support to firmware update workflow
The firmware update workflow now supports updating any number of shelves in a
single run. After entering IPs for the first shelf, the user is prompted to
add another; this repeats until done. Firmware file and update-type choices
are made once and applied to all shelves sequentially.

_show_fw_versions() updated to accept (label, iom, ip) tuples so the display
label and Redfish path name can differ for multi-shelf tables (e.g. "S1 / IOM1"
vs "IOM1"). Pre- and post-update version tables include the shelf number when
more than one shelf is being updated.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 08:20:48 -04:00

238 lines
8.0 KiB
Python

"""
redfish.py — Redfish API client functions shared across all ES24N workflows.
Handles GET/PATCH requests, firmware upload, task polling, and version queries.
"""
import json
import os
import ssl
import time
import urllib.error
import urllib.request
from base64 import b64encode
from typing import Optional
from ui import _c, C, info, draw_table
def _redfish_request(password: str, method: str, path: str,
payload: Optional[dict] = None,
host: str = "127.0.0.1") -> tuple:
"""
Issue a Redfish HTTP request. host defaults to the serial loopback (127.0.0.1)
but can be set to an IOM's network IP for firmware update operations.
Returns (success: bool, data: dict|str).
"""
url = f"https://{host}{path}"
username = "root" if host == "127.0.0.1" else "Admin"
credentials = b64encode(f"{username}:{password}".encode()).decode()
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
body = json.dumps(payload).encode("utf-8") if payload else None
headers = {"Authorization": f"Basic {credentials}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, method=method, headers=headers)
try:
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
return True, json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
return True, {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(raw).get("error", {}).get("message", raw)
except json.JSONDecodeError:
msg = raw
return False, f"HTTP {e.code}: {msg[:120]}"
except OSError as e:
return False, f"Connection error: {e}"
def _redfish_upload_firmware(password: str, host: str, fw_path: str) -> tuple:
"""
Upload a firmware file to /redfish/v1/UpdateService using multipart/form-data.
Equivalent to: curl -k -u Admin:<PW> https://<IP>/redfish/v1/UpdateService
-X POST -F "software=@<file>"
"""
try:
with open(fw_path, "rb") as f:
file_data = f.read()
except OSError as e:
return False, f"Cannot read file: {e}"
filename = os.path.basename(fw_path)
boundary = f"FormBoundary{int(time.time() * 1000)}"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="software"; filename="{filename}"\r\n'
"Content-Type: application/octet-stream\r\n"
"\r\n"
).encode() + file_data + f"\r\n--{boundary}--\r\n".encode()
url = f"https://{host}/redfish/v1/UpdateService"
credentials = b64encode(f"Admin:{password}".encode()).decode() # always network
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Authorization": f"Basic {credentials}",
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
)
try:
with urllib.request.urlopen(req, context=ctx, timeout=120) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
return True, json.loads(raw) if raw.strip() else {}
except json.JSONDecodeError:
return True, {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
msg = json.loads(raw).get("error", {}).get("message", raw)
except json.JSONDecodeError:
msg = raw
return False, f"HTTP {e.code}: {msg[:120]}"
except OSError as e:
return False, f"Connection error: {e}"
def _redfish_trigger_update(password: str, host: str, target: str) -> tuple:
"""
Trigger a Redfish SimpleUpdate for the given target resource path.
target: e.g. "/redfish/v1/Managers/IOM1"
or "/redfish/v1/Chassis/IOM1/NetworkAdapters/1"
"""
return _redfish_request(
password, "POST",
"/redfish/v1/UpdateService/Actions/SimpleUpdate",
payload={
"ImageURI": "/redfish/v1/UpdateService/software",
"Targets": [target],
},
host=host,
)
def _redfish_poll_tasks(password: str, host: str, timeout: int = 600) -> tuple:
"""
Poll /redfish/v1/TaskService/Tasks/ until all tasks reach a terminal state
or timeout is exceeded. Returns (success: bool, message: str).
"""
TERMINAL = {"Completed", "Killed", "Exception"}
deadline = time.monotonic() + timeout
elapsed = 0
while time.monotonic() < deadline:
ok_flag, data = _redfish_request(
password, "GET", "/redfish/v1/TaskService/Tasks/", host=host,
)
if not ok_flag:
return False, f"Task service error: {data}"
members = data.get("Members", [])
if not members:
return True, "No pending tasks."
running = []
for member in members:
state = member.get("TaskState")
if state is None:
task_path = member.get("@odata.id", "")
if task_path:
t_ok, t_data = _redfish_request(
password, "GET", task_path, host=host,
)
state = (t_data.get("TaskState")
if t_ok and isinstance(t_data, dict) else "Running")
else:
state = "Running"
if state not in TERMINAL:
running.append(state)
if not running:
return True, "All tasks completed."
info(f" Tasks running ({', '.join(running)})... [{elapsed}s elapsed]")
time.sleep(10)
elapsed += 10
return False, f"Timeout after {timeout}s waiting for tasks."
def _redfish_restart_iom(password: str, host: str, iom: str) -> tuple:
return _redfish_request(
password, "POST",
f"/redfish/v1/Managers/{iom}/Actions/Manager.Reset",
payload={"ResetType": "GracefulRestart"},
host=host,
)
def _redfish_reset_fabric(password: str, host: str, iom: str) -> tuple:
return _redfish_request(
password, "POST",
f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1/Actions/NetworkAdapter.Reset",
payload={"ResetType": "GracefulRestart"},
host=host,
)
def _get_iom_fw_version(password: str, host: str, iom: str) -> str:
ok_flag, data = _redfish_request(
password, "GET", f"/redfish/v1/Managers/{iom}", host=host,
)
if ok_flag and isinstance(data, dict):
return data.get("FirmwareVersion", "Unknown")
return _c(C.RED, "Unreachable")
def _get_fabric_fw_version(password: str, host: str, iom: str) -> str:
ok_flag, data = _redfish_request(
password, "GET",
f"/redfish/v1/Chassis/{iom}/NetworkAdapters/1",
host=host,
)
if ok_flag and isinstance(data, dict):
version = (data.get("Oem", {})
.get("Version", {})
.get("ActiveFirmwareVersion"))
return version or "Unknown"
return _c(C.RED, "Unreachable")
def _show_fw_versions(password: str, targets: list):
"""
Query and display firmware versions.
targets: list of (label, iom, ip) tuples where
label: display string (e.g. "IOM1", "S1 / IOM1")
iom: actual IOM name used in Redfish paths ("IOM1" or "IOM2")
ip: management IP address
"""
info("Querying firmware versions...")
rows = []
for label, iom, ip in targets:
iom_ver = _get_iom_fw_version(password, ip, iom)
fabric_ver = _get_fabric_fw_version(password, ip, iom)
rows.append([label, ip, iom_ver, fabric_ver])
print()
draw_table(
["IOM", "IP Address", "IOM Firmware", "Fabric Firmware"],
rows,
[12, 16, 32, 20],
)
print()