Add destination audit wizard with selective deletion

New top-level wizard option (2) lets users inspect and clean up an
existing destination before migration. Queries all SMB shares, NFS
exports, iSCSI objects, datasets, and zvols; displays a structured
inventory report; then offers per-category deletion with escalating
warnings — standard confirm for shares/iSCSI, explicit "DELETE" phrase
required for zvols and datasets to guard against accidental data loss.

Adds to client.py: query_destination_inventory, delete_smb_shares,
delete_nfs_exports, delete_zvols, delete_datasets.
Adds to cli.py: _fmt_bytes, _print_inventory_report, _run_audit_wizard.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 15:56:10 -05:00
parent 3fd9e6b6a8
commit c28ce9e3b8
2 changed files with 413 additions and 2 deletions

View File

@@ -54,8 +54,14 @@ from pathlib import Path
from typing import Optional
from .archive import parse_archive, list_archive_and_exit
from .client import TrueNASClient, check_dataset_paths, create_missing_datasets, check_iscsi_zvols, create_missing_zvols
from .colors import log, _bold, _bold_cyan, _bold_red, _bold_yellow, _cyan, _dim, _green, _yellow
from .client import (
TrueNASClient,
check_dataset_paths, create_missing_datasets,
check_iscsi_zvols, create_missing_zvols,
query_destination_inventory,
delete_smb_shares, delete_nfs_exports, delete_zvols, delete_datasets,
)
from .colors import log, _bold, _bold_cyan, _bold_green, _bold_red, _bold_yellow, _cyan, _dim, _green, _yellow
from .csv_source import parse_csv_sources
from .migrate import migrate_smb_shares, migrate_nfs_shares, migrate_iscsi, query_existing_iscsi, clear_iscsi_config
from .summary import Summary
@@ -142,6 +148,14 @@ def _parse_size(s: str) -> int:
return int(s) # plain bytes
def _fmt_bytes(n: int) -> str:
"""Format a byte count as a human-readable string."""
for suffix, div in [("TiB", 1 << 40), ("GiB", 1 << 30), ("MiB", 1 << 20), ("KiB", 1 << 10)]:
if n >= div:
return f"{n / div:.1f} {suffix}"
return f"{n} B"
def _find_debug_archives(directory: str = ".") -> list[Path]:
"""Return sorted list of TrueNAS debug archives found in *directory*."""
patterns = ("*.tgz", "*.tar.gz", "*.tar", "*.txz", "*.tar.xz")
@@ -312,6 +326,272 @@ def _select_shares(shares: list[dict], share_type: str) -> list[dict]:
return selected
# ─────────────────────────────────────────────────────────────────────────────
# Destination audit wizard
# ─────────────────────────────────────────────────────────────────────────────
def _print_inventory_report(host: str, inv: dict) -> None:
"""Print a structured inventory of all configuration on the destination."""
smb = inv.get("smb_shares", [])
nfs = inv.get("nfs_exports", [])
ds = inv.get("datasets", [])
zvols = inv.get("zvols", [])
ext = inv.get("iscsi_extents", [])
init = inv.get("iscsi_initiators", [])
portals = inv.get("iscsi_portals", [])
tgt = inv.get("iscsi_targets", [])
te = inv.get("iscsi_targetextents", [])
header = f"DESTINATION INVENTORY: {host}"
rule = _bold_cyan("" * (len(header) + 4))
print(f"\n {rule}")
print(f" {_bold_cyan('')} {_bold(header)} {_bold_cyan('')}")
print(f" {rule}")
# SMB
if smb:
print(f"\n {_bold(f'SMB Shares ({len(smb)})')}")
for s in smb:
name = s.get("name", "<unnamed>")
path = s.get("path", "")
enabled = "" if s.get("enabled", True) else _dim(" [disabled]")
print(f" {_cyan('')} {name:<24} {_dim(path)}{enabled}")
else:
print(f"\n {_dim('SMB Shares: none')}")
# NFS
if nfs:
print(f"\n {_bold(f'NFS Exports ({len(nfs)})')}")
for n in nfs:
path = n.get("path", "<no path>")
enabled = "" if n.get("enabled", True) else _dim(" [disabled]")
print(f" {_cyan('')} {path}{enabled}")
else:
print(f"\n {_dim('NFS Exports: none')}")
# iSCSI
has_iscsi = any([ext, init, portals, tgt, te])
if has_iscsi:
iscsi_total = len(ext) + len(init) + len(portals) + len(tgt) + len(te)
print(f"\n {_bold(f'iSCSI Configuration ({iscsi_total} objects)')}")
if ext:
print(f" {_bold('Extents')} ({len(ext)}):")
for e in ext:
kind = e.get("type", "")
backing = e.get("disk") or e.get("path") or ""
print(f" {_cyan('')} {e.get('name', '<unnamed>'):<22} {_dim(kind + ' ' + backing)}")
if init:
print(f" {_bold('Initiator Groups')} ({len(init)}):")
for i in init:
print(f" {_cyan('')} {i.get('comment') or '<no comment>'}")
if portals:
print(f" {_bold('Portals')} ({len(portals)}):")
for p in portals:
ips = ", ".join(l["ip"] for l in p.get("listen", []))
comment = p.get("comment", "")
label = f"{comment} " if comment else ""
print(f" {_cyan('')} {label}{_dim(ips)}")
if tgt:
print(f" {_bold('Targets')} ({len(tgt)}):")
for t in tgt:
print(f" {_cyan('')} {t.get('name', '<unnamed>')}")
if te:
print(f" {_bold('Target-Extent Associations')} ({len(te)})")
else:
print(f"\n {_dim('iSCSI: none')}")
# Datasets
if ds:
print(f"\n {_bold(f'Datasets ({len(ds)})')}")
for d in ds[:20]:
name = d.get("id", "")
is_root = "/" not in name
used_raw = d.get("used", {})
used_bytes = used_raw.get("parsed", 0) if isinstance(used_raw, dict) else 0
used_str = f" {_fmt_bytes(used_bytes)} used" if used_bytes else ""
root_tag = _dim(" (pool root)") if is_root else ""
print(f" {_cyan('')} {name}{root_tag}{_dim(used_str)}")
if len(ds) > 20:
print(f" {_dim(f'… and {len(ds) - 20} more')}")
else:
print(f"\n {_dim('Datasets: none')}")
# Zvols
if zvols:
print(f"\n {_bold(f'Zvols ({len(zvols)})')}")
for z in zvols:
name = z.get("id", "")
vs_raw = z.get("volsize", {})
vs = vs_raw.get("parsed", 0) if isinstance(vs_raw, dict) else 0
vs_str = f" {_fmt_bytes(vs)}" if vs else ""
print(f" {_cyan('')} {name}{_dim(vs_str)}")
else:
print(f"\n {_dim('Zvols: none')}")
print()
def _run_audit_wizard(host: str, port: int, api_key: str) -> None:
"""Query destination inventory and offer to selectively delete configuration."""
print(f"\n Querying {_bold(host)}\n")
async def _query() -> dict:
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
return await query_destination_inventory(client)
try:
inv = asyncio.run(_query())
except (OSError, PermissionError) as exc:
print(f" {_bold_red('Connection failed:')} {exc}\n")
return
_print_inventory_report(host, inv)
total = sum(len(v) for v in inv.values())
if total == 0:
print(f" {_dim('The destination appears to have no configuration.')}\n")
return
# ── Deletion options ───────────────────────────────────────────────────────
print(f" {_bold_yellow('' * 60)}")
print(f" {_bold_yellow('DELETION OPTIONS')}")
print(f" {_dim('You may choose to delete some or all of the configuration above.')}")
print(f" {_bold_red('WARNING: Deleted datasets and zvols cannot be recovered — all data will be permanently lost.')}")
print()
has_iscsi = any(inv[k] for k in ("iscsi_extents", "iscsi_initiators",
"iscsi_portals", "iscsi_targets",
"iscsi_targetextents"))
iscsi_count = sum(len(inv[k]) for k in ("iscsi_extents", "iscsi_initiators",
"iscsi_portals", "iscsi_targets",
"iscsi_targetextents"))
deletable_ds = [d for d in inv["datasets"] if "/" in d["id"]]
del_iscsi = False
del_smb = False
del_nfs = False
del_zvols = False
del_datasets = False
# iSCSI (must go first — uses zvols as backing)
if has_iscsi:
del_iscsi = _confirm(
f" Delete ALL iSCSI configuration ({iscsi_count} objects)?"
)
# SMB
if inv["smb_shares"]:
del_smb = _confirm(
f" Delete all {len(inv['smb_shares'])} SMB share(s)?"
)
# NFS
if inv["nfs_exports"]:
del_nfs = _confirm(
f" Delete all {len(inv['nfs_exports'])} NFS export(s)?"
)
# Zvols — require explicit confirmation phrase
if inv["zvols"]:
print()
print(f" {_bold_red('⚠ DATA DESTRUCTION WARNING ⚠')}")
print(f" Deleting zvols PERMANENTLY DESTROYS all data stored in them.")
print(f" This action cannot be undone. Affected zvols:")
for z in inv["zvols"]:
print(f" {_yellow('')} {z['id']}")
print()
raw = _prompt(
f" Type DELETE to confirm deletion of {len(inv['zvols'])} zvol(s),"
" or Enter to skip"
).strip()
del_zvols = (raw == "DELETE")
if raw and raw != "DELETE":
print(f" {_dim('Confirmation not matched — zvols will not be deleted.')}")
print()
# Datasets — strongest warning
if deletable_ds:
print(f" {_bold_red('⚠⚠ CRITICAL DATA DESTRUCTION WARNING ⚠⚠')}")
print(f" Deleting datasets PERMANENTLY DESTROYS ALL DATA including all files,")
print(f" snapshots, and child datasets. Pool root datasets (e.g. 'tank') will")
print(f" be skipped, but all child datasets WILL be deleted.")
print(f" This action cannot be undone. {len(deletable_ds)} dataset(s) would be deleted.")
print()
raw = _prompt(
f" Type DELETE to confirm deletion of {len(deletable_ds)} dataset(s),"
" or Enter to skip"
).strip()
del_datasets = (raw == "DELETE")
if raw and raw != "DELETE":
print(f" {_dim('Confirmation not matched — datasets will not be deleted.')}")
print()
# ── Nothing selected ───────────────────────────────────────────────────────
if not any([del_iscsi, del_smb, del_nfs, del_zvols, del_datasets]):
print(f" {_dim('Nothing selected for deletion. No changes made.')}\n")
return
# ── Final confirmation ─────────────────────────────────────────────────────
print(f" {_bold_yellow('' * 60)}")
print(f" {_bold_yellow('PENDING DELETIONS on ' + host + ':')}")
if del_iscsi:
print(f" {_yellow('')} ALL iSCSI configuration ({iscsi_count} objects)")
if del_smb:
print(f" {_yellow('')} {len(inv['smb_shares'])} SMB share(s)")
if del_nfs:
print(f" {_yellow('')} {len(inv['nfs_exports'])} NFS export(s)")
if del_zvols:
print(f" {_bold_red('')} {len(inv['zvols'])} zvol(s) "
f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}")
if del_datasets:
print(f" {_bold_red('')} {len(deletable_ds)} dataset(s) "
f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}")
print()
print(f" {_bold_red('THIS ACTION CANNOT BE UNDONE.')}")
print()
if not _confirm(f" Proceed with all selected deletions on {host}?"):
print(f" {_dim('Aborted no changes made.')}\n")
return
# ── Execute ────────────────────────────────────────────────────────────────
print()
async def _execute() -> None:
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
if del_iscsi:
print(f" Removing iSCSI configuration …")
await clear_iscsi_config(client)
print(f" {_bold_green('')} iSCSI configuration removed.")
if del_smb:
print(f" Removing SMB shares …")
ok, fail = await delete_smb_shares(client, inv["smb_shares"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_nfs:
print(f" Removing NFS exports …")
ok, fail = await delete_nfs_exports(client, inv["nfs_exports"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_zvols:
print(f" Removing zvols …")
ok, fail = await delete_zvols(client, inv["zvols"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_datasets:
print(f" Removing datasets …")
ok, fail = await delete_datasets(client, deletable_ds)
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
asyncio.run(_execute())
print(f"\n {_bold_cyan('Done.')}\n")
# ─────────────────────────────────────────────────────────────────────────────
# Interactive wizard
# ─────────────────────────────────────────────────────────────────────────────
@@ -323,6 +603,33 @@ def interactive_mode() -> None:
f" {_dim('Migrate SMB/NFS shares to a live TrueNAS system.')}\n"
)
# 0 ── Top-level action ─────────────────────────────────────────────────────
print(f" {_bold('What would you like to do?')}")
print(f" {_cyan('1.')} Migrate configuration to a destination system")
print(f" {_cyan('2.')} Audit destination system (view and manage existing config)")
action_raw = _prompt(" Select [1/2]", default="1")
print()
if action_raw.strip() == "2":
audit_host = ""
while not audit_host:
audit_host = _prompt("Destination TrueNAS host or IP")
if not audit_host:
print(" Host is required.")
audit_port_raw = _prompt("WebSocket port", default="443")
audit_port = int(audit_port_raw) if audit_port_raw.isdigit() else 443
audit_key = ""
while not audit_key:
try:
audit_key = getpass.getpass("API key (input hidden): ").strip()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
if not audit_key:
print(" API key is required.")
_run_audit_wizard(audit_host, audit_port, audit_key)
return
# 1 ── Source type ──────────────────────────────────────────────────────────
print(f" {_bold('Source type:')}")
print(f" {_cyan('1.')} TrueNAS debug archive (.tgz / .tar)")

View File

@@ -382,3 +382,107 @@ async def create_missing_zvols(
) as client:
for name, volsize in zvols.items():
await create_zvol(client, name, volsize)
# ─────────────────────────────────────────────────────────────────────────────
# Destination inventory
# ─────────────────────────────────────────────────────────────────────────────
async def query_destination_inventory(client: TrueNASClient) -> dict[str, list]:
"""
Query all current configuration from the destination system.
Returns a dict with keys: smb_shares, nfs_exports, datasets, zvols,
iscsi_extents, iscsi_initiators, iscsi_portals, iscsi_targets, iscsi_targetextents.
Each value is a list (may be empty if the query fails or returns nothing).
"""
result: dict[str, list] = {}
for key, method, params in [
("smb_shares", "sharing.smb.query", None),
("nfs_exports", "sharing.nfs.query", None),
("datasets", "pool.dataset.query", [[["type", "=", "FILESYSTEM"]]]),
("zvols", "pool.dataset.query", [[["type", "=", "VOLUME"]]]),
("iscsi_extents", "iscsi.extent.query", None),
("iscsi_initiators", "iscsi.initiator.query", None),
("iscsi_portals", "iscsi.portal.query", None),
("iscsi_targets", "iscsi.target.query", None),
("iscsi_targetextents", "iscsi.targetextent.query", None),
]:
try:
result[key] = await client.call(method, params) or []
except RuntimeError as exc:
log.warning("Could not query %s: %s", key, exc)
result[key] = []
return result
async def delete_smb_shares(
client: TrueNASClient, shares: list[dict]
) -> tuple[int, int]:
"""Delete SMB shares by ID. Returns (deleted, failed)."""
deleted = failed = 0
for share in shares:
try:
await client.call("sharing.smb.delete", [share["id"]])
log.info(" Deleted SMB share %r", share.get("name"))
deleted += 1
except RuntimeError as exc:
log.error(" Failed to delete SMB share %r: %s", share.get("name"), exc)
failed += 1
return deleted, failed
async def delete_nfs_exports(
client: TrueNASClient, exports: list[dict]
) -> tuple[int, int]:
"""Delete NFS exports by ID. Returns (deleted, failed)."""
deleted = failed = 0
for export in exports:
try:
await client.call("sharing.nfs.delete", [export["id"]])
log.info(" Deleted NFS export %r", export.get("path"))
deleted += 1
except RuntimeError as exc:
log.error(" Failed to delete NFS export %r: %s", export.get("path"), exc)
failed += 1
return deleted, failed
async def delete_zvols(
client: TrueNASClient, zvols: list[dict]
) -> tuple[int, int]:
"""Delete zvols. Returns (deleted, failed)."""
deleted = failed = 0
for zvol in zvols:
try:
await client.call("pool.dataset.delete", [zvol["id"], {"recursive": True}])
log.info(" Deleted zvol %r", zvol["id"])
deleted += 1
except RuntimeError as exc:
log.error(" Failed to delete zvol %r: %s", zvol["id"], exc)
failed += 1
return deleted, failed
async def delete_datasets(
client: TrueNASClient, datasets: list[dict]
) -> tuple[int, int]:
"""
Delete datasets deepest-first to avoid parent-before-child errors.
Skips pool root datasets (no '/' in the dataset name).
Returns (deleted, failed).
"""
sorted_ds = sorted(
(d for d in datasets if "/" in d["id"]),
key=lambda d: d["id"].count("/"),
reverse=True,
)
deleted = failed = 0
for ds in sorted_ds:
try:
await client.call("pool.dataset.delete", [ds["id"], {"recursive": True}])
log.info(" Deleted dataset %r", ds["id"])
deleted += 1
except RuntimeError as exc:
log.error(" Failed to delete dataset %r: %s", ds["id"], exc)
failed += 1
return deleted, failed