Files
scott c28ce9e3b8 Add destination audit wizard with selective deletion
New top-level wizard option (2) lets users inspect and clean up an
existing destination before migration. Queries all SMB shares, NFS
exports, iSCSI objects, datasets, and zvols; displays a structured
inventory report; then offers per-category deletion with escalating
warnings — standard confirm for shares/iSCSI, explicit "DELETE" phrase
required for zvols and datasets to guard against accidental data loss.

Adds to client.py: query_destination_inventory, delete_smb_shares,
delete_nfs_exports, delete_zvols, delete_datasets.
Adds to cli.py: _fmt_bytes, _print_inventory_report, _run_audit_wizard.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 15:56:10 -05:00

955 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
truenas_migrate TrueNAS Share Migration Tool
=================================================
Reads SMB shares and NFS shares from either a TrueNAS debug archive (.tar / .tgz)
or customer-supplied CSV files, then re-creates them on a destination TrueNAS
system via the JSON-RPC 2.0 WebSocket API (TrueNAS 25.04+).
SAFE BY DEFAULT
• Existing shares are never overwritten or deleted.
• Always run with --dry-run first to preview what will happen.
REQUIREMENTS
Python 3.9+ (stdlib only no external packages needed)
QUICK START — Archive source
# 1. Inspect your debug archive to confirm it contains the data you need:
python -m truenas_migrate --debug-tar debug.tgz --list-archive
# 2. Dry-run connect to destination but make zero changes:
python -m truenas_migrate \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--dry-run
# 3. Live migration:
python -m truenas_migrate \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx"
QUICK START — CSV source
# Fill in smb_shares_template.csv / nfs_shares_template.csv, then:
python -m truenas_migrate \\
--smb-csv smb_shares.csv \\
--nfs-csv nfs_shares.csv \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--dry-run
CONFLICT POLICY
Shares that already exist on the destination are silently skipped:
SMB matched by share name (case-insensitive)
NFS matched by export path (exact match)
"""
from __future__ import annotations
import argparse
import asyncio
import getpass
import logging
import sys
from pathlib import Path
from typing import Optional
from .archive import parse_archive, list_archive_and_exit
from .client import (
TrueNASClient,
check_dataset_paths, create_missing_datasets,
check_iscsi_zvols, create_missing_zvols,
query_destination_inventory,
delete_smb_shares, delete_nfs_exports, delete_zvols, delete_datasets,
)
from .colors import log, _bold, _bold_cyan, _bold_green, _bold_red, _bold_yellow, _cyan, _dim, _green, _yellow
from .csv_source import parse_csv_sources
from .migrate import migrate_smb_shares, migrate_nfs_shares, migrate_iscsi, query_existing_iscsi, clear_iscsi_config
from .summary import Summary
# ─────────────────────────────────────────────────────────────────────────────
# CLI orchestration
# ─────────────────────────────────────────────────────────────────────────────
async def run(
args: argparse.Namespace,
archive: Optional[dict] = None,
) -> Summary:
if archive is None:
smb_csv = getattr(args, "smb_csv", None)
nfs_csv = getattr(args, "nfs_csv", None)
if smb_csv or nfs_csv:
archive = parse_csv_sources(smb_csv, nfs_csv)
else:
archive = parse_archive(args.debug_tar)
migrate_set = set(args.migrate)
if args.dry_run:
msg = " DRY RUN no changes will be made on the destination "
bar = _bold_yellow("" * len(msg))
print(f"\n{_bold_yellow('')}{bar}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{_bold_yellow(msg)}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{bar}{_bold_yellow('')}\n", file=sys.stderr)
summary = Summary()
async with TrueNASClient(
host=args.dest,
port=args.port,
api_key=args.api_key,
verify_ssl=args.verify_ssl,
) as client:
if "smb" in migrate_set:
await migrate_smb_shares(
client, archive["smb_shares"], args.dry_run, summary)
if "nfs" in migrate_set:
await migrate_nfs_shares(
client, archive["nfs_shares"], args.dry_run, summary)
if "iscsi" in migrate_set:
await migrate_iscsi(
client, archive.get("iscsi", {}), args.dry_run, summary)
if args.dry_run and summary.paths_to_create:
summary.missing_datasets = await check_dataset_paths(
client, summary.paths_to_create,
)
if args.dry_run and summary.zvols_to_check:
summary.missing_zvols = await check_iscsi_zvols(
client, summary.zvols_to_check,
)
return summary
# ─────────────────────────────────────────────────────────────────────────────
# Interactive wizard helpers
# ─────────────────────────────────────────────────────────────────────────────
def _parse_size(s: str) -> int:
"""Parse a human-friendly size string to bytes. E.g. '100G', '500GiB', '1T'."""
s = s.strip().upper()
for suffix, mult in [
("PIB", 1 << 50), ("PB", 1 << 50), ("P", 1 << 50),
("TIB", 1 << 40), ("TB", 1 << 40), ("T", 1 << 40),
("GIB", 1 << 30), ("GB", 1 << 30), ("G", 1 << 30),
("MIB", 1 << 20), ("MB", 1 << 20), ("M", 1 << 20),
("KIB", 1 << 10), ("KB", 1 << 10), ("K", 1 << 10),
]:
if s.endswith(suffix):
try:
return int(float(s[:-len(suffix)]) * mult)
except ValueError:
pass
return int(s) # plain bytes
def _fmt_bytes(n: int) -> str:
"""Format a byte count as a human-readable string."""
for suffix, div in [("TiB", 1 << 40), ("GiB", 1 << 30), ("MiB", 1 << 20), ("KiB", 1 << 10)]:
if n >= div:
return f"{n / div:.1f} {suffix}"
return f"{n} B"
def _find_debug_archives(directory: str = ".") -> list[Path]:
"""Return sorted list of TrueNAS debug archives found in *directory*."""
patterns = ("*.tgz", "*.tar.gz", "*.tar", "*.txz", "*.tar.xz")
found: set[Path] = set()
for pat in patterns:
found.update(Path(directory).glob(pat))
return sorted(found)
def _prompt(label: str, default: str = "") -> str:
suffix = f" [{default}]" if default else ""
try:
val = input(f"{label}{suffix}: ").strip()
return val if val else default
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
def _confirm(label: str) -> bool:
try:
return input(f"{label} [y/N]: ").strip().lower() in ("y", "yes")
except (EOFError, KeyboardInterrupt):
print()
return False
def _prompt_csv_path(share_type: str) -> Optional[str]:
"""Prompt for a CSV file path. Returns resolved path string or None if skipped."""
template = f"{share_type.lower()}_shares_template.csv"
print(f" {_dim('(template: ' + template + ')')}")
while True:
raw = _prompt(f" {share_type} shares CSV path (Enter to skip)")
if not raw:
return None
p = Path(raw)
if p.is_file():
return str(p)
print(f" {_bold_red('File not found:')} {raw}")
def _prompt_iscsi_portals(iscsi: dict) -> None:
"""Walk each portal and prompt for destination IPs in-place."""
portals = iscsi.get("portals", [])
if not portals:
return
print(f"\n {_bold('iSCSI Portal Configuration')}")
print(f" {_dim('Portal IP addresses are unique per system and must be updated.')}")
print(f" {_dim('For MPIO, enter multiple IPs separated by spaces.')}")
for portal in portals:
comment = portal.get("comment", "")
listen = portal.get("listen", [])
src_ips = " ".join(f"{l['ip']}" for l in listen)
label = f"Portal {portal['id']}" + (f" ({comment!r})" if comment else "")
print(f"\n {_bold(label)}")
print(f" {_dim('Source IP(s):')} {src_ips}")
raw = _prompt(" Destination IP(s)").strip()
if not raw:
print(f" {_yellow('')} No IPs entered — keeping source IPs.")
continue
dest_ips = raw.split()
portal["listen"] = [{"ip": ip} for ip in dest_ips]
print(f" {_green('')} Portal: {', '.join(dest_ips)}")
print()
def _prompt_clear_existing_iscsi(host: str, port: int, api_key: str) -> None:
"""
Check whether the destination already has iSCSI configuration.
If so, summarise what exists and offer to remove it before migration.
"""
async def _check():
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
return await query_existing_iscsi(client)
existing = asyncio.run(_check())
counts = {k: len(v) for k, v in existing.items()}
total = sum(counts.values())
if total == 0:
return
print(f"\n {_bold_yellow('WARNING:')} Destination already has iSCSI configuration:")
labels = [
("extents", "extent(s)"),
("initiators", "initiator group(s)"),
("portals", "portal(s)"),
("targets", "target(s)"),
("targetextents", "target-extent association(s)"),
]
for key, label in labels:
n = counts[key]
if n:
print(f"{n} {label}")
print()
print(f" {_dim('Keep existing: new objects will be skipped if conflicts are detected.')}")
print(f" {_dim('Remove existing: ALL iSCSI config will be deleted before migration.')}")
print()
raw = _prompt(" [K]eep existing / [R]emove all existing iSCSI config", default="K")
if raw.strip().lower().startswith("r"):
if _confirm(f" Remove ALL {total} iSCSI object(s) from {host}?"):
async def _clear():
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
await clear_iscsi_config(client)
print()
asyncio.run(_clear())
print(f" {_bold_cyan('')} iSCSI configuration cleared.\n")
else:
print(f" {_yellow('')} Removal cancelled — keeping existing config.\n")
else:
print(f" {_dim('Keeping existing iSCSI configuration.')}\n")
def _select_shares(shares: list[dict], share_type: str) -> list[dict]:
"""
Display a numbered list of *shares* and return only those the user selects.
Enter (or 'all') returns all shares unchanged. 'n' / 'none' returns [].
"""
if not shares:
return shares
print(f"\n {_bold(f'{share_type} shares ({len(shares)}):')} \n")
for i, share in enumerate(shares, 1):
if share_type == "SMB":
name = share.get("name", "<unnamed>")
path = share.get("path", "")
print(f" {_cyan(str(i) + '.')} {name:<22} {_dim(path)}")
else: # NFS
pl = share.get("paths") or []
path = share.get("path") or (pl[0] if pl else "")
extra = f" {_dim('+ ' + str(len(pl) - 1) + ' more')}" if len(pl) > 1 else ""
print(f" {_cyan(str(i) + '.')} {path}{extra}")
print()
raw = _prompt(
f" Select {share_type} shares to migrate "
"(e.g. '1 3', Enter = all, 'n' = none)",
default="all",
)
low = raw.strip().lower()
if low in ("", "all"):
print(f" {_green('')} All {len(shares)} {share_type} share(s) selected.")
return shares
if low in ("n", "none", "0"):
print(f" {_yellow('')} No {share_type} shares selected.")
return []
seen: set[int] = set()
selected: list[dict] = []
for tok in raw.split():
if tok.isdigit():
idx = int(tok) - 1
if 0 <= idx < len(shares) and idx not in seen:
seen.add(idx)
selected.append(shares[idx])
if selected:
print(f" {_green('')} {len(selected)} of {len(shares)} {share_type} share(s) selected.")
else:
print(f" {_yellow('')} No valid selections; skipping {share_type} shares.")
return selected
# ─────────────────────────────────────────────────────────────────────────────
# Destination audit wizard
# ─────────────────────────────────────────────────────────────────────────────
def _print_inventory_report(host: str, inv: dict) -> None:
"""Print a structured inventory of all configuration on the destination."""
smb = inv.get("smb_shares", [])
nfs = inv.get("nfs_exports", [])
ds = inv.get("datasets", [])
zvols = inv.get("zvols", [])
ext = inv.get("iscsi_extents", [])
init = inv.get("iscsi_initiators", [])
portals = inv.get("iscsi_portals", [])
tgt = inv.get("iscsi_targets", [])
te = inv.get("iscsi_targetextents", [])
header = f"DESTINATION INVENTORY: {host}"
rule = _bold_cyan("" * (len(header) + 4))
print(f"\n {rule}")
print(f" {_bold_cyan('')} {_bold(header)} {_bold_cyan('')}")
print(f" {rule}")
# SMB
if smb:
print(f"\n {_bold(f'SMB Shares ({len(smb)})')}")
for s in smb:
name = s.get("name", "<unnamed>")
path = s.get("path", "")
enabled = "" if s.get("enabled", True) else _dim(" [disabled]")
print(f" {_cyan('')} {name:<24} {_dim(path)}{enabled}")
else:
print(f"\n {_dim('SMB Shares: none')}")
# NFS
if nfs:
print(f"\n {_bold(f'NFS Exports ({len(nfs)})')}")
for n in nfs:
path = n.get("path", "<no path>")
enabled = "" if n.get("enabled", True) else _dim(" [disabled]")
print(f" {_cyan('')} {path}{enabled}")
else:
print(f"\n {_dim('NFS Exports: none')}")
# iSCSI
has_iscsi = any([ext, init, portals, tgt, te])
if has_iscsi:
iscsi_total = len(ext) + len(init) + len(portals) + len(tgt) + len(te)
print(f"\n {_bold(f'iSCSI Configuration ({iscsi_total} objects)')}")
if ext:
print(f" {_bold('Extents')} ({len(ext)}):")
for e in ext:
kind = e.get("type", "")
backing = e.get("disk") or e.get("path") or ""
print(f" {_cyan('')} {e.get('name', '<unnamed>'):<22} {_dim(kind + ' ' + backing)}")
if init:
print(f" {_bold('Initiator Groups')} ({len(init)}):")
for i in init:
print(f" {_cyan('')} {i.get('comment') or '<no comment>'}")
if portals:
print(f" {_bold('Portals')} ({len(portals)}):")
for p in portals:
ips = ", ".join(l["ip"] for l in p.get("listen", []))
comment = p.get("comment", "")
label = f"{comment} " if comment else ""
print(f" {_cyan('')} {label}{_dim(ips)}")
if tgt:
print(f" {_bold('Targets')} ({len(tgt)}):")
for t in tgt:
print(f" {_cyan('')} {t.get('name', '<unnamed>')}")
if te:
print(f" {_bold('Target-Extent Associations')} ({len(te)})")
else:
print(f"\n {_dim('iSCSI: none')}")
# Datasets
if ds:
print(f"\n {_bold(f'Datasets ({len(ds)})')}")
for d in ds[:20]:
name = d.get("id", "")
is_root = "/" not in name
used_raw = d.get("used", {})
used_bytes = used_raw.get("parsed", 0) if isinstance(used_raw, dict) else 0
used_str = f" {_fmt_bytes(used_bytes)} used" if used_bytes else ""
root_tag = _dim(" (pool root)") if is_root else ""
print(f" {_cyan('')} {name}{root_tag}{_dim(used_str)}")
if len(ds) > 20:
print(f" {_dim(f'… and {len(ds) - 20} more')}")
else:
print(f"\n {_dim('Datasets: none')}")
# Zvols
if zvols:
print(f"\n {_bold(f'Zvols ({len(zvols)})')}")
for z in zvols:
name = z.get("id", "")
vs_raw = z.get("volsize", {})
vs = vs_raw.get("parsed", 0) if isinstance(vs_raw, dict) else 0
vs_str = f" {_fmt_bytes(vs)}" if vs else ""
print(f" {_cyan('')} {name}{_dim(vs_str)}")
else:
print(f"\n {_dim('Zvols: none')}")
print()
def _run_audit_wizard(host: str, port: int, api_key: str) -> None:
"""Query destination inventory and offer to selectively delete configuration."""
print(f"\n Querying {_bold(host)}\n")
async def _query() -> dict:
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
return await query_destination_inventory(client)
try:
inv = asyncio.run(_query())
except (OSError, PermissionError) as exc:
print(f" {_bold_red('Connection failed:')} {exc}\n")
return
_print_inventory_report(host, inv)
total = sum(len(v) for v in inv.values())
if total == 0:
print(f" {_dim('The destination appears to have no configuration.')}\n")
return
# ── Deletion options ───────────────────────────────────────────────────────
print(f" {_bold_yellow('' * 60)}")
print(f" {_bold_yellow('DELETION OPTIONS')}")
print(f" {_dim('You may choose to delete some or all of the configuration above.')}")
print(f" {_bold_red('WARNING: Deleted datasets and zvols cannot be recovered — all data will be permanently lost.')}")
print()
has_iscsi = any(inv[k] for k in ("iscsi_extents", "iscsi_initiators",
"iscsi_portals", "iscsi_targets",
"iscsi_targetextents"))
iscsi_count = sum(len(inv[k]) for k in ("iscsi_extents", "iscsi_initiators",
"iscsi_portals", "iscsi_targets",
"iscsi_targetextents"))
deletable_ds = [d for d in inv["datasets"] if "/" in d["id"]]
del_iscsi = False
del_smb = False
del_nfs = False
del_zvols = False
del_datasets = False
# iSCSI (must go first — uses zvols as backing)
if has_iscsi:
del_iscsi = _confirm(
f" Delete ALL iSCSI configuration ({iscsi_count} objects)?"
)
# SMB
if inv["smb_shares"]:
del_smb = _confirm(
f" Delete all {len(inv['smb_shares'])} SMB share(s)?"
)
# NFS
if inv["nfs_exports"]:
del_nfs = _confirm(
f" Delete all {len(inv['nfs_exports'])} NFS export(s)?"
)
# Zvols — require explicit confirmation phrase
if inv["zvols"]:
print()
print(f" {_bold_red('⚠ DATA DESTRUCTION WARNING ⚠')}")
print(f" Deleting zvols PERMANENTLY DESTROYS all data stored in them.")
print(f" This action cannot be undone. Affected zvols:")
for z in inv["zvols"]:
print(f" {_yellow('')} {z['id']}")
print()
raw = _prompt(
f" Type DELETE to confirm deletion of {len(inv['zvols'])} zvol(s),"
" or Enter to skip"
).strip()
del_zvols = (raw == "DELETE")
if raw and raw != "DELETE":
print(f" {_dim('Confirmation not matched — zvols will not be deleted.')}")
print()
# Datasets — strongest warning
if deletable_ds:
print(f" {_bold_red('⚠⚠ CRITICAL DATA DESTRUCTION WARNING ⚠⚠')}")
print(f" Deleting datasets PERMANENTLY DESTROYS ALL DATA including all files,")
print(f" snapshots, and child datasets. Pool root datasets (e.g. 'tank') will")
print(f" be skipped, but all child datasets WILL be deleted.")
print(f" This action cannot be undone. {len(deletable_ds)} dataset(s) would be deleted.")
print()
raw = _prompt(
f" Type DELETE to confirm deletion of {len(deletable_ds)} dataset(s),"
" or Enter to skip"
).strip()
del_datasets = (raw == "DELETE")
if raw and raw != "DELETE":
print(f" {_dim('Confirmation not matched — datasets will not be deleted.')}")
print()
# ── Nothing selected ───────────────────────────────────────────────────────
if not any([del_iscsi, del_smb, del_nfs, del_zvols, del_datasets]):
print(f" {_dim('Nothing selected for deletion. No changes made.')}\n")
return
# ── Final confirmation ─────────────────────────────────────────────────────
print(f" {_bold_yellow('' * 60)}")
print(f" {_bold_yellow('PENDING DELETIONS on ' + host + ':')}")
if del_iscsi:
print(f" {_yellow('')} ALL iSCSI configuration ({iscsi_count} objects)")
if del_smb:
print(f" {_yellow('')} {len(inv['smb_shares'])} SMB share(s)")
if del_nfs:
print(f" {_yellow('')} {len(inv['nfs_exports'])} NFS export(s)")
if del_zvols:
print(f" {_bold_red('')} {len(inv['zvols'])} zvol(s) "
f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}")
if del_datasets:
print(f" {_bold_red('')} {len(deletable_ds)} dataset(s) "
f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}")
print()
print(f" {_bold_red('THIS ACTION CANNOT BE UNDONE.')}")
print()
if not _confirm(f" Proceed with all selected deletions on {host}?"):
print(f" {_dim('Aborted no changes made.')}\n")
return
# ── Execute ────────────────────────────────────────────────────────────────
print()
async def _execute() -> None:
async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client:
if del_iscsi:
print(f" Removing iSCSI configuration …")
await clear_iscsi_config(client)
print(f" {_bold_green('')} iSCSI configuration removed.")
if del_smb:
print(f" Removing SMB shares …")
ok, fail = await delete_smb_shares(client, inv["smb_shares"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_nfs:
print(f" Removing NFS exports …")
ok, fail = await delete_nfs_exports(client, inv["nfs_exports"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_zvols:
print(f" Removing zvols …")
ok, fail = await delete_zvols(client, inv["zvols"])
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
if del_datasets:
print(f" Removing datasets …")
ok, fail = await delete_datasets(client, deletable_ds)
suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else ""
print(f" {_bold_green('')} {ok} deleted{suffix}")
asyncio.run(_execute())
print(f"\n {_bold_cyan('Done.')}\n")
# ─────────────────────────────────────────────────────────────────────────────
# Interactive wizard
# ─────────────────────────────────────────────────────────────────────────────
def interactive_mode() -> None:
"""Interactive wizard: pick source → configure → dry run → confirm → apply."""
print(
f"\n{_bold_cyan(' TrueNAS Share Migration Tool')}\n"
f" {_dim('Migrate SMB/NFS shares to a live TrueNAS system.')}\n"
)
# 0 ── Top-level action ─────────────────────────────────────────────────────
print(f" {_bold('What would you like to do?')}")
print(f" {_cyan('1.')} Migrate configuration to a destination system")
print(f" {_cyan('2.')} Audit destination system (view and manage existing config)")
action_raw = _prompt(" Select [1/2]", default="1")
print()
if action_raw.strip() == "2":
audit_host = ""
while not audit_host:
audit_host = _prompt("Destination TrueNAS host or IP")
if not audit_host:
print(" Host is required.")
audit_port_raw = _prompt("WebSocket port", default="443")
audit_port = int(audit_port_raw) if audit_port_raw.isdigit() else 443
audit_key = ""
while not audit_key:
try:
audit_key = getpass.getpass("API key (input hidden): ").strip()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
if not audit_key:
print(" API key is required.")
_run_audit_wizard(audit_host, audit_port, audit_key)
return
# 1 ── Source type ──────────────────────────────────────────────────────────
print(f" {_bold('Source type:')}")
print(f" {_cyan('1.')} TrueNAS debug archive (.tgz / .tar)")
print(f" {_cyan('2.')} CSV import (non-TrueNAS source)")
src_raw = _prompt(" Select source [1/2]", default="1")
use_csv = src_raw.strip() == "2"
print()
# 2 ── Destination ──────────────────────────────────────────────────────────
host = ""
while not host:
host = _prompt("Destination TrueNAS host or IP")
if not host:
print(" Host is required.")
port_raw = _prompt("WebSocket port", default="443")
port = int(port_raw) if port_raw.isdigit() else 443
# 3 ── API key ──────────────────────────────────────────────────────────────
api_key = ""
while not api_key:
try:
api_key = getpass.getpass("API key (input hidden): ").strip()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
if not api_key:
print(" API key is required.")
if use_csv:
# ── CSV source ──────────────────────────────────────────────────────────
print(f"\n {_bold('CSV file paths:')}")
print(f" {_dim('Press Enter to skip a share type.')}\n")
smb_csv_path = _prompt_csv_path("SMB")
print()
nfs_csv_path = _prompt_csv_path("NFS")
migrate: list[str] = []
if smb_csv_path:
migrate.append("smb")
if nfs_csv_path:
migrate.append("nfs")
if not migrate:
sys.exit("No CSV files provided nothing to migrate.")
print()
archive_data = parse_csv_sources(smb_csv_path, nfs_csv_path)
extra_ns: dict = {"smb_csv": smb_csv_path, "nfs_csv": nfs_csv_path}
else:
# ── Archive source ──────────────────────────────────────────────────────
archives = _find_debug_archives()
if not archives:
sys.exit(
"No debug archives (.tgz / .tar.gz / .tar / .txz) found in the "
"current directory.\n"
"Copy your TrueNAS debug file here, or use --debug-tar to specify a path."
)
if len(archives) == 1:
chosen = archives[0]
print(f" {_dim('Archive:')} {_bold(chosen.name)} "
f"{_dim('(' + f'{chosen.stat().st_size / 1_048_576:.1f} MB' + ')')}\n")
else:
print(f" {_bold('Debug archives found:')}\n")
for i, p in enumerate(archives, 1):
print(f" {_cyan(str(i) + '.')} {p.name} "
f"{_dim('(' + f'{p.stat().st_size / 1_048_576:.1f} MB' + ')')}")
print()
while True:
raw = _prompt(f"Select archive [1-{len(archives)}]")
if raw.isdigit() and 1 <= int(raw) <= len(archives):
chosen = archives[int(raw) - 1]
break
print(f" Enter a number from 1 to {len(archives)}.")
# ── Migration scope ─────────────────────────────────────────────────────
print(f"\n {_bold('What to migrate?')}")
print(f" {_cyan('1.')} SMB shares")
print(f" {_cyan('2.')} NFS shares")
print(f" {_cyan('3.')} iSCSI (targets, extents, portals, initiator groups)")
sel_raw = _prompt(
"Selection (space-separated numbers, Enter for all)", default="1 2 3"
)
_sel_map = {"1": "smb", "2": "nfs", "3": "iscsi"}
migrate = []
for tok in sel_raw.split():
if tok in _sel_map and _sel_map[tok] not in migrate:
migrate.append(_sel_map[tok])
if not migrate:
migrate = ["smb", "nfs", "iscsi"]
# ── Parse archive ───────────────────────────────────────────────────────
print()
archive_data = parse_archive(str(chosen))
extra_ns = {"debug_tar": str(chosen)}
# ── iSCSI portal IP remapping ────────────────────────────────────────
if "iscsi" in migrate and archive_data.get("iscsi", {}).get("portals"):
_prompt_iscsi_portals(archive_data["iscsi"])
# ── iSCSI pre-migration check ────────────────────────────────────────
if "iscsi" in migrate:
_prompt_clear_existing_iscsi(host, port, api_key)
# ── Select individual shares (common) ──────────────────────────────────────
if "smb" in migrate and archive_data["smb_shares"]:
archive_data["smb_shares"] = _select_shares(archive_data["smb_shares"], "SMB")
if "nfs" in migrate and archive_data["nfs_shares"]:
archive_data["nfs_shares"] = _select_shares(archive_data["nfs_shares"], "NFS")
print()
base_ns = dict(
dest=host,
port=port,
api_key=api_key,
verify_ssl=False,
migrate=migrate,
**extra_ns,
)
# 6 ── Dry run ──────────────────────────────────────────────────────────────
dry_summary = asyncio.run(
run(argparse.Namespace(**base_ns, dry_run=True), archive_data)
)
print(dry_summary.report())
# Offer to create missing datasets before the live run
if dry_summary.missing_datasets:
non_mnt = [p for p in dry_summary.missing_datasets if not p.startswith("/mnt/")]
creatable = [p for p in dry_summary.missing_datasets if p.startswith("/mnt/")]
if non_mnt:
print(f" NOTE: {len(non_mnt)} path(s) cannot be auto-created "
"(not under /mnt/):")
for p in non_mnt:
print(f"{p}")
print()
if creatable:
print(f" {len(creatable)} dataset(s) can be created automatically:")
for p in creatable:
print(f"{p}")
print()
if _confirm(f"Create these {len(creatable)} dataset(s) on {host} now?"):
asyncio.run(create_missing_datasets(
host=host,
port=port,
api_key=api_key,
paths=creatable,
))
print()
if dry_summary.missing_zvols:
print(f"\n {len(dry_summary.missing_zvols)} zvol(s) need to be created for iSCSI extents:")
for z in dry_summary.missing_zvols:
print(f"{z}")
print()
if _confirm(f"Create these {len(dry_summary.missing_zvols)} zvol(s) on {host} now?"):
zvol_sizes: dict[str, int] = {}
for zvol in dry_summary.missing_zvols:
while True:
raw = _prompt(f" Size for {zvol} (e.g. 100G, 500GiB, 1T)").strip()
if not raw:
print(" Size is required.")
continue
try:
zvol_sizes[zvol] = _parse_size(raw)
break
except ValueError:
print(f" Cannot parse {raw!r} — try a format like 100G or 500GiB.")
asyncio.run(create_missing_zvols(
host=host, port=port, api_key=api_key, zvols=zvol_sizes,
))
print()
print(f" Re-running dry run to verify zvol creation …")
print()
dry_summary = asyncio.run(
run(argparse.Namespace(**base_ns, dry_run=True), archive_data)
)
print(dry_summary.report())
if not _confirm(f"Apply these changes to {host}?"):
print("Aborted no changes made.")
sys.exit(0)
# 7 ── Live run ─────────────────────────────────────────────────────────────
print()
live_summary = asyncio.run(
run(argparse.Namespace(**base_ns, dry_run=False), archive_data)
)
print(live_summary.report())
if live_summary.errors:
sys.exit(2)
# ─────────────────────────────────────────────────────────────────────────────
# Argument parser + entry point
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
if len(sys.argv) == 1:
interactive_mode()
return
p = argparse.ArgumentParser(
prog="truenas_migrate",
description=(
"Migrate SMB and NFS shares to a live TrueNAS destination system. "
"Source can be a TrueNAS debug archive or customer-supplied CSV files."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
# ── Source ────────────────────────────────────────────────────────────────
src = p.add_argument_group("source (choose one)")
src.add_argument(
"--debug-tar", metavar="FILE",
help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.",
)
src.add_argument(
"--smb-csv", metavar="FILE",
help="Path to a CSV file containing SMB share definitions (non-TrueNAS source).",
)
src.add_argument(
"--nfs-csv", metavar="FILE",
help="Path to a CSV file containing NFS share definitions (non-TrueNAS source).",
)
p.add_argument(
"--list-archive", action="store_true",
help=(
"List all JSON files found in the archive and exit. "
"Requires --debug-tar."
),
)
# ── Destination ───────────────────────────────────────────────────────────
p.add_argument(
"--dest", metavar="HOST",
help="Hostname or IP of the DESTINATION TrueNAS system.",
)
p.add_argument(
"--port", type=int, default=443, metavar="PORT",
help="WebSocket port on the destination (default: 443).",
)
p.add_argument(
"--verify-ssl", action="store_true",
help=(
"Verify the destination TLS certificate. "
"Off by default because most TrueNAS systems use self-signed certs."
),
)
# ── Authentication ────────────────────────────────────────────────────────
p.add_argument(
"--api-key", metavar="KEY",
help=(
"TrueNAS API key. Generate one in TrueNAS UI: "
"top-right account menu → API Keys."
),
)
# ── Scope ─────────────────────────────────────────────────────────────────
p.add_argument(
"--migrate",
nargs="+",
choices=["smb", "nfs", "iscsi"],
default=["smb", "nfs", "iscsi"],
metavar="TYPE",
help=(
"What to migrate. Choices: smb nfs iscsi "
"(default: both). Example: --migrate smb"
),
)
p.add_argument(
"--dry-run", action="store_true",
help="Parse source and connect to destination, but make no changes.",
)
p.add_argument(
"--verbose", "-v", action="store_true",
help="Enable DEBUG-level logging.",
)
args = p.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
has_archive = bool(args.debug_tar)
has_csv = bool(args.smb_csv or args.nfs_csv)
if has_archive and has_csv:
p.error("Cannot combine --debug-tar with --smb-csv / --nfs-csv.")
if not has_archive and not has_csv:
p.error(
"Specify a source: --debug-tar FILE or --smb-csv / --nfs-csv FILE(s)."
)
if has_archive:
if not Path(args.debug_tar).is_file():
p.error(f"Archive not found: {args.debug_tar}")
if args.list_archive:
list_archive_and_exit(args.debug_tar) # does not return
else:
if args.list_archive:
p.error("--list-archive requires --debug-tar.")
if args.smb_csv and not Path(args.smb_csv).is_file():
p.error(f"SMB CSV not found: {args.smb_csv}")
if args.nfs_csv and not Path(args.nfs_csv).is_file():
p.error(f"NFS CSV not found: {args.nfs_csv}")
if not args.dest:
p.error("--dest is required.")
if not args.api_key:
p.error("--api-key is required.")
summary = asyncio.run(run(args))
print(summary.report())
if summary.errors:
sys.exit(2)