""" truenas_migrate – TrueNAS Share Migration Tool ================================================= Reads SMB shares and NFS shares from either a TrueNAS debug archive (.tar / .tgz) or customer-supplied CSV files, then re-creates them on a destination TrueNAS system via the JSON-RPC 2.0 WebSocket API (TrueNAS 25.04+). SAFE BY DEFAULT • Existing shares are never overwritten or deleted. • Always run with --dry-run first to preview what will happen. REQUIREMENTS Python 3.9+ (stdlib only – no external packages needed) QUICK START — Archive source # 1. Inspect your debug archive to confirm it contains the data you need: python -m truenas_migrate --debug-tar debug.tgz --list-archive # 2. Dry-run – connect to destination but make zero changes: python -m truenas_migrate \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --dry-run # 3. Live migration: python -m truenas_migrate \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" QUICK START — CSV source # Fill in smb_shares_template.csv / nfs_shares_template.csv, then: python -m truenas_migrate \\ --smb-csv smb_shares.csv \\ --nfs-csv nfs_shares.csv \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --dry-run CONFLICT POLICY Shares that already exist on the destination are silently skipped: SMB – matched by share name (case-insensitive) NFS – matched by export path (exact match) """ from __future__ import annotations import argparse import asyncio import getpass import logging import sys from pathlib import Path from typing import Optional from .archive import parse_archive, list_archive_and_exit from .client import ( TrueNASClient, check_dataset_paths, create_missing_datasets, check_iscsi_zvols, create_missing_zvols, query_destination_inventory, delete_smb_shares, delete_nfs_exports, delete_zvols, delete_datasets, ) from .colors import log, _bold, _bold_cyan, _bold_green, _bold_red, _bold_yellow, _cyan, _dim, _green, _yellow from .csv_source import parse_csv_sources from .migrate import migrate_smb_shares, migrate_nfs_shares, migrate_iscsi, query_existing_iscsi, clear_iscsi_config from .summary import Summary # ───────────────────────────────────────────────────────────────────────────── # CLI orchestration # ───────────────────────────────────────────────────────────────────────────── async def run( args: argparse.Namespace, archive: Optional[dict] = None, ) -> Summary: if archive is None: smb_csv = getattr(args, "smb_csv", None) nfs_csv = getattr(args, "nfs_csv", None) if smb_csv or nfs_csv: archive = parse_csv_sources(smb_csv, nfs_csv) else: archive = parse_archive(args.debug_tar) migrate_set = set(args.migrate) if args.dry_run: msg = " DRY RUN – no changes will be made on the destination " bar = _bold_yellow("─" * len(msg)) print(f"\n{_bold_yellow('┌')}{bar}{_bold_yellow('┐')}", file=sys.stderr) print(f"{_bold_yellow('│')}{_bold_yellow(msg)}{_bold_yellow('│')}", file=sys.stderr) print(f"{_bold_yellow('└')}{bar}{_bold_yellow('┘')}\n", file=sys.stderr) summary = Summary() async with TrueNASClient( host=args.dest, port=args.port, api_key=args.api_key, verify_ssl=args.verify_ssl, ) as client: if "smb" in migrate_set: await migrate_smb_shares( client, archive["smb_shares"], args.dry_run, summary) if "nfs" in migrate_set: await migrate_nfs_shares( client, archive["nfs_shares"], args.dry_run, summary) if "iscsi" in migrate_set: await migrate_iscsi( client, archive.get("iscsi", {}), args.dry_run, summary) if args.dry_run and summary.paths_to_create: summary.missing_datasets = await check_dataset_paths( client, summary.paths_to_create, ) if args.dry_run and summary.zvols_to_check: summary.missing_zvols = await check_iscsi_zvols( client, summary.zvols_to_check, ) return summary # ───────────────────────────────────────────────────────────────────────────── # Interactive wizard helpers # ───────────────────────────────────────────────────────────────────────────── def _parse_size(s: str) -> int: """Parse a human-friendly size string to bytes. E.g. '100G', '500GiB', '1T'.""" s = s.strip().upper() for suffix, mult in [ ("PIB", 1 << 50), ("PB", 1 << 50), ("P", 1 << 50), ("TIB", 1 << 40), ("TB", 1 << 40), ("T", 1 << 40), ("GIB", 1 << 30), ("GB", 1 << 30), ("G", 1 << 30), ("MIB", 1 << 20), ("MB", 1 << 20), ("M", 1 << 20), ("KIB", 1 << 10), ("KB", 1 << 10), ("K", 1 << 10), ]: if s.endswith(suffix): try: return int(float(s[:-len(suffix)]) * mult) except ValueError: pass return int(s) # plain bytes def _fmt_bytes(n: int) -> str: """Format a byte count as a human-readable string.""" for suffix, div in [("TiB", 1 << 40), ("GiB", 1 << 30), ("MiB", 1 << 20), ("KiB", 1 << 10)]: if n >= div: return f"{n / div:.1f} {suffix}" return f"{n} B" def _find_debug_archives(directory: str = ".") -> list[Path]: """Return sorted list of TrueNAS debug archives found in *directory*.""" patterns = ("*.tgz", "*.tar.gz", "*.tar", "*.txz", "*.tar.xz") found: set[Path] = set() for pat in patterns: found.update(Path(directory).glob(pat)) return sorted(found) def _prompt(label: str, default: str = "") -> str: suffix = f" [{default}]" if default else "" try: val = input(f"{label}{suffix}: ").strip() return val if val else default except (EOFError, KeyboardInterrupt): print() sys.exit(0) def _confirm(label: str) -> bool: try: return input(f"{label} [y/N]: ").strip().lower() in ("y", "yes") except (EOFError, KeyboardInterrupt): print() return False def _prompt_csv_path(share_type: str) -> Optional[str]: """Prompt for a CSV file path. Returns resolved path string or None if skipped.""" template = f"{share_type.lower()}_shares_template.csv" print(f" {_dim('(template: ' + template + ')')}") while True: raw = _prompt(f" {share_type} shares CSV path (Enter to skip)") if not raw: return None p = Path(raw) if p.is_file(): return str(p) print(f" {_bold_red('File not found:')} {raw}") def _prompt_iscsi_portals(iscsi: dict) -> None: """Walk each portal and prompt for destination IPs in-place.""" portals = iscsi.get("portals", []) if not portals: return print(f"\n {_bold('iSCSI Portal Configuration')}") print(f" {_dim('Portal IP addresses are unique per system and must be updated.')}") print(f" {_dim('For MPIO, enter multiple IPs separated by spaces.')}") for portal in portals: comment = portal.get("comment", "") listen = portal.get("listen", []) src_ips = " ".join(f"{l['ip']}" for l in listen) label = f"Portal {portal['id']}" + (f" ({comment!r})" if comment else "") print(f"\n {_bold(label)}") print(f" {_dim('Source IP(s):')} {src_ips}") raw = _prompt(" Destination IP(s)").strip() if not raw: print(f" {_yellow('⚠')} No IPs entered — keeping source IPs.") continue dest_ips = raw.split() portal["listen"] = [{"ip": ip} for ip in dest_ips] print(f" {_green('✓')} Portal: {', '.join(dest_ips)}") print() def _prompt_clear_existing_iscsi(host: str, port: int, api_key: str) -> None: """ Check whether the destination already has iSCSI configuration. If so, summarise what exists and offer to remove it before migration. """ async def _check(): async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client: return await query_existing_iscsi(client) existing = asyncio.run(_check()) counts = {k: len(v) for k, v in existing.items()} total = sum(counts.values()) if total == 0: return print(f"\n {_bold_yellow('WARNING:')} Destination already has iSCSI configuration:") labels = [ ("extents", "extent(s)"), ("initiators", "initiator group(s)"), ("portals", "portal(s)"), ("targets", "target(s)"), ("targetextents", "target-extent association(s)"), ] for key, label in labels: n = counts[key] if n: print(f" • {n} {label}") print() print(f" {_dim('Keep existing: new objects will be skipped if conflicts are detected.')}") print(f" {_dim('Remove existing: ALL iSCSI config will be deleted before migration.')}") print() raw = _prompt(" [K]eep existing / [R]emove all existing iSCSI config", default="K") if raw.strip().lower().startswith("r"): if _confirm(f" Remove ALL {total} iSCSI object(s) from {host}?"): async def _clear(): async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client: await clear_iscsi_config(client) print() asyncio.run(_clear()) print(f" {_bold_cyan('✓')} iSCSI configuration cleared.\n") else: print(f" {_yellow('–')} Removal cancelled — keeping existing config.\n") else: print(f" {_dim('Keeping existing iSCSI configuration.')}\n") def _select_shares(shares: list[dict], share_type: str) -> list[dict]: """ Display a numbered list of *shares* and return only those the user selects. Enter (or 'all') returns all shares unchanged. 'n' / 'none' returns []. """ if not shares: return shares print(f"\n {_bold(f'{share_type} shares ({len(shares)}):')} \n") for i, share in enumerate(shares, 1): if share_type == "SMB": name = share.get("name", "") path = share.get("path", "") print(f" {_cyan(str(i) + '.')} {name:<22} {_dim(path)}") else: # NFS pl = share.get("paths") or [] path = share.get("path") or (pl[0] if pl else "") extra = f" {_dim('+ ' + str(len(pl) - 1) + ' more')}" if len(pl) > 1 else "" print(f" {_cyan(str(i) + '.')} {path}{extra}") print() raw = _prompt( f" Select {share_type} shares to migrate " "(e.g. '1 3', Enter = all, 'n' = none)", default="all", ) low = raw.strip().lower() if low in ("", "all"): print(f" {_green('✓')} All {len(shares)} {share_type} share(s) selected.") return shares if low in ("n", "none", "0"): print(f" {_yellow('–')} No {share_type} shares selected.") return [] seen: set[int] = set() selected: list[dict] = [] for tok in raw.split(): if tok.isdigit(): idx = int(tok) - 1 if 0 <= idx < len(shares) and idx not in seen: seen.add(idx) selected.append(shares[idx]) if selected: print(f" {_green('✓')} {len(selected)} of {len(shares)} {share_type} share(s) selected.") else: print(f" {_yellow('–')} No valid selections; skipping {share_type} shares.") return selected # ───────────────────────────────────────────────────────────────────────────── # Destination audit wizard # ───────────────────────────────────────────────────────────────────────────── def _print_inventory_report(host: str, inv: dict) -> None: """Print a structured inventory of all configuration on the destination.""" smb = inv.get("smb_shares", []) nfs = inv.get("nfs_exports", []) ds = inv.get("datasets", []) zvols = inv.get("zvols", []) ext = inv.get("iscsi_extents", []) init = inv.get("iscsi_initiators", []) portals = inv.get("iscsi_portals", []) tgt = inv.get("iscsi_targets", []) te = inv.get("iscsi_targetextents", []) header = f"DESTINATION INVENTORY: {host}" rule = _bold_cyan("─" * (len(header) + 4)) print(f"\n {rule}") print(f" {_bold_cyan('│')} {_bold(header)} {_bold_cyan('│')}") print(f" {rule}") # SMB if smb: print(f"\n {_bold(f'SMB Shares ({len(smb)})')}") for s in smb: name = s.get("name", "") path = s.get("path", "") enabled = "" if s.get("enabled", True) else _dim(" [disabled]") print(f" {_cyan('•')} {name:<24} {_dim(path)}{enabled}") else: print(f"\n {_dim('SMB Shares: none')}") # NFS if nfs: print(f"\n {_bold(f'NFS Exports ({len(nfs)})')}") for n in nfs: path = n.get("path", "") enabled = "" if n.get("enabled", True) else _dim(" [disabled]") print(f" {_cyan('•')} {path}{enabled}") else: print(f"\n {_dim('NFS Exports: none')}") # iSCSI has_iscsi = any([ext, init, portals, tgt, te]) if has_iscsi: iscsi_total = len(ext) + len(init) + len(portals) + len(tgt) + len(te) print(f"\n {_bold(f'iSCSI Configuration ({iscsi_total} objects)')}") if ext: print(f" {_bold('Extents')} ({len(ext)}):") for e in ext: kind = e.get("type", "") backing = e.get("disk") or e.get("path") or "" print(f" {_cyan('•')} {e.get('name', ''):<22} {_dim(kind + ' ' + backing)}") if init: print(f" {_bold('Initiator Groups')} ({len(init)}):") for i in init: print(f" {_cyan('•')} {i.get('comment') or ''}") if portals: print(f" {_bold('Portals')} ({len(portals)}):") for p in portals: ips = ", ".join(l["ip"] for l in p.get("listen", [])) comment = p.get("comment", "") label = f"{comment} " if comment else "" print(f" {_cyan('•')} {label}{_dim(ips)}") if tgt: print(f" {_bold('Targets')} ({len(tgt)}):") for t in tgt: print(f" {_cyan('•')} {t.get('name', '')}") if te: print(f" {_bold('Target-Extent Associations')} ({len(te)})") else: print(f"\n {_dim('iSCSI: none')}") # Datasets if ds: print(f"\n {_bold(f'Datasets ({len(ds)})')}") for d in ds[:20]: name = d.get("id", "") is_root = "/" not in name used_raw = d.get("used", {}) used_bytes = used_raw.get("parsed", 0) if isinstance(used_raw, dict) else 0 used_str = f" {_fmt_bytes(used_bytes)} used" if used_bytes else "" root_tag = _dim(" (pool root)") if is_root else "" print(f" {_cyan('•')} {name}{root_tag}{_dim(used_str)}") if len(ds) > 20: print(f" {_dim(f'… and {len(ds) - 20} more')}") else: print(f"\n {_dim('Datasets: none')}") # Zvols if zvols: print(f"\n {_bold(f'Zvols ({len(zvols)})')}") for z in zvols: name = z.get("id", "") vs_raw = z.get("volsize", {}) vs = vs_raw.get("parsed", 0) if isinstance(vs_raw, dict) else 0 vs_str = f" {_fmt_bytes(vs)}" if vs else "" print(f" {_cyan('•')} {name}{_dim(vs_str)}") else: print(f"\n {_dim('Zvols: none')}") print() def _run_audit_wizard(host: str, port: int, api_key: str) -> None: """Query destination inventory and offer to selectively delete configuration.""" print(f"\n Querying {_bold(host)} …\n") async def _query() -> dict: async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client: return await query_destination_inventory(client) try: inv = asyncio.run(_query()) except (OSError, PermissionError) as exc: print(f" {_bold_red('Connection failed:')} {exc}\n") return _print_inventory_report(host, inv) total = sum(len(v) for v in inv.values()) if total == 0: print(f" {_dim('The destination appears to have no configuration.')}\n") return # ── Deletion options ─────────────────────────────────────────────────────── print(f" {_bold_yellow('─' * 60)}") print(f" {_bold_yellow('DELETION OPTIONS')}") print(f" {_dim('You may choose to delete some or all of the configuration above.')}") print(f" {_bold_red('WARNING: Deleted datasets and zvols cannot be recovered — all data will be permanently lost.')}") print() has_iscsi = any(inv[k] for k in ("iscsi_extents", "iscsi_initiators", "iscsi_portals", "iscsi_targets", "iscsi_targetextents")) iscsi_count = sum(len(inv[k]) for k in ("iscsi_extents", "iscsi_initiators", "iscsi_portals", "iscsi_targets", "iscsi_targetextents")) deletable_ds = [d for d in inv["datasets"] if "/" in d["id"]] del_iscsi = False del_smb = False del_nfs = False del_zvols = False del_datasets = False # iSCSI (must go first — uses zvols as backing) if has_iscsi: del_iscsi = _confirm( f" Delete ALL iSCSI configuration ({iscsi_count} objects)?" ) # SMB if inv["smb_shares"]: del_smb = _confirm( f" Delete all {len(inv['smb_shares'])} SMB share(s)?" ) # NFS if inv["nfs_exports"]: del_nfs = _confirm( f" Delete all {len(inv['nfs_exports'])} NFS export(s)?" ) # Zvols — require explicit confirmation phrase if inv["zvols"]: print() print(f" {_bold_red('⚠ DATA DESTRUCTION WARNING ⚠')}") print(f" Deleting zvols PERMANENTLY DESTROYS all data stored in them.") print(f" This action cannot be undone. Affected zvols:") for z in inv["zvols"]: print(f" {_yellow('•')} {z['id']}") print() raw = _prompt( f" Type DELETE to confirm deletion of {len(inv['zvols'])} zvol(s)," " or Enter to skip" ).strip() del_zvols = (raw == "DELETE") if raw and raw != "DELETE": print(f" {_dim('Confirmation not matched — zvols will not be deleted.')}") print() # Datasets — strongest warning if deletable_ds: print(f" {_bold_red('⚠⚠ CRITICAL DATA DESTRUCTION WARNING ⚠⚠')}") print(f" Deleting datasets PERMANENTLY DESTROYS ALL DATA including all files,") print(f" snapshots, and child datasets. Pool root datasets (e.g. 'tank') will") print(f" be skipped, but all child datasets WILL be deleted.") print(f" This action cannot be undone. {len(deletable_ds)} dataset(s) would be deleted.") print() raw = _prompt( f" Type DELETE to confirm deletion of {len(deletable_ds)} dataset(s)," " or Enter to skip" ).strip() del_datasets = (raw == "DELETE") if raw and raw != "DELETE": print(f" {_dim('Confirmation not matched — datasets will not be deleted.')}") print() # ── Nothing selected ─────────────────────────────────────────────────────── if not any([del_iscsi, del_smb, del_nfs, del_zvols, del_datasets]): print(f" {_dim('Nothing selected for deletion. No changes made.')}\n") return # ── Final confirmation ───────────────────────────────────────────────────── print(f" {_bold_yellow('─' * 60)}") print(f" {_bold_yellow('PENDING DELETIONS on ' + host + ':')}") if del_iscsi: print(f" {_yellow('•')} ALL iSCSI configuration ({iscsi_count} objects)") if del_smb: print(f" {_yellow('•')} {len(inv['smb_shares'])} SMB share(s)") if del_nfs: print(f" {_yellow('•')} {len(inv['nfs_exports'])} NFS export(s)") if del_zvols: print(f" {_bold_red('•')} {len(inv['zvols'])} zvol(s) " f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}") if del_datasets: print(f" {_bold_red('•')} {len(deletable_ds)} dataset(s) " f"{_bold_red('⚠ ALL DATA WILL BE PERMANENTLY DESTROYED')}") print() print(f" {_bold_red('THIS ACTION CANNOT BE UNDONE.')}") print() if not _confirm(f" Proceed with all selected deletions on {host}?"): print(f" {_dim('Aborted – no changes made.')}\n") return # ── Execute ──────────────────────────────────────────────────────────────── print() async def _execute() -> None: async with TrueNASClient(host=host, port=port, api_key=api_key, verify_ssl=False) as client: if del_iscsi: print(f" Removing iSCSI configuration …") await clear_iscsi_config(client) print(f" {_bold_green('✓')} iSCSI configuration removed.") if del_smb: print(f" Removing SMB shares …") ok, fail = await delete_smb_shares(client, inv["smb_shares"]) suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else "" print(f" {_bold_green('✓')} {ok} deleted{suffix}") if del_nfs: print(f" Removing NFS exports …") ok, fail = await delete_nfs_exports(client, inv["nfs_exports"]) suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else "" print(f" {_bold_green('✓')} {ok} deleted{suffix}") if del_zvols: print(f" Removing zvols …") ok, fail = await delete_zvols(client, inv["zvols"]) suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else "" print(f" {_bold_green('✓')} {ok} deleted{suffix}") if del_datasets: print(f" Removing datasets …") ok, fail = await delete_datasets(client, deletable_ds) suffix = f" {_bold_red(str(fail) + ' failed')}" if fail else "" print(f" {_bold_green('✓')} {ok} deleted{suffix}") asyncio.run(_execute()) print(f"\n {_bold_cyan('Done.')}\n") # ───────────────────────────────────────────────────────────────────────────── # Interactive wizard # ───────────────────────────────────────────────────────────────────────────── def interactive_mode() -> None: """Interactive wizard: pick source → configure → dry run → confirm → apply.""" print( f"\n{_bold_cyan(' TrueNAS Share Migration Tool')}\n" f" {_dim('Migrate SMB/NFS shares to a live TrueNAS system.')}\n" ) # 0 ── Top-level action ───────────────────────────────────────────────────── print(f" {_bold('What would you like to do?')}") print(f" {_cyan('1.')} Migrate configuration to a destination system") print(f" {_cyan('2.')} Audit destination system (view and manage existing config)") action_raw = _prompt(" Select [1/2]", default="1") print() if action_raw.strip() == "2": audit_host = "" while not audit_host: audit_host = _prompt("Destination TrueNAS host or IP") if not audit_host: print(" Host is required.") audit_port_raw = _prompt("WebSocket port", default="443") audit_port = int(audit_port_raw) if audit_port_raw.isdigit() else 443 audit_key = "" while not audit_key: try: audit_key = getpass.getpass("API key (input hidden): ").strip() except (EOFError, KeyboardInterrupt): print() sys.exit(0) if not audit_key: print(" API key is required.") _run_audit_wizard(audit_host, audit_port, audit_key) return # 1 ── Source type ────────────────────────────────────────────────────────── print(f" {_bold('Source type:')}") print(f" {_cyan('1.')} TrueNAS debug archive (.tgz / .tar)") print(f" {_cyan('2.')} CSV import (non-TrueNAS source)") src_raw = _prompt(" Select source [1/2]", default="1") use_csv = src_raw.strip() == "2" print() # 2 ── Destination ────────────────────────────────────────────────────────── host = "" while not host: host = _prompt("Destination TrueNAS host or IP") if not host: print(" Host is required.") port_raw = _prompt("WebSocket port", default="443") port = int(port_raw) if port_raw.isdigit() else 443 # 3 ── API key ────────────────────────────────────────────────────────────── api_key = "" while not api_key: try: api_key = getpass.getpass("API key (input hidden): ").strip() except (EOFError, KeyboardInterrupt): print() sys.exit(0) if not api_key: print(" API key is required.") if use_csv: # ── CSV source ────────────────────────────────────────────────────────── print(f"\n {_bold('CSV file paths:')}") print(f" {_dim('Press Enter to skip a share type.')}\n") smb_csv_path = _prompt_csv_path("SMB") print() nfs_csv_path = _prompt_csv_path("NFS") migrate: list[str] = [] if smb_csv_path: migrate.append("smb") if nfs_csv_path: migrate.append("nfs") if not migrate: sys.exit("No CSV files provided – nothing to migrate.") print() archive_data = parse_csv_sources(smb_csv_path, nfs_csv_path) extra_ns: dict = {"smb_csv": smb_csv_path, "nfs_csv": nfs_csv_path} else: # ── Archive source ────────────────────────────────────────────────────── archives = _find_debug_archives() if not archives: sys.exit( "No debug archives (.tgz / .tar.gz / .tar / .txz) found in the " "current directory.\n" "Copy your TrueNAS debug file here, or use --debug-tar to specify a path." ) if len(archives) == 1: chosen = archives[0] print(f" {_dim('Archive:')} {_bold(chosen.name)} " f"{_dim('(' + f'{chosen.stat().st_size / 1_048_576:.1f} MB' + ')')}\n") else: print(f" {_bold('Debug archives found:')}\n") for i, p in enumerate(archives, 1): print(f" {_cyan(str(i) + '.')} {p.name} " f"{_dim('(' + f'{p.stat().st_size / 1_048_576:.1f} MB' + ')')}") print() while True: raw = _prompt(f"Select archive [1-{len(archives)}]") if raw.isdigit() and 1 <= int(raw) <= len(archives): chosen = archives[int(raw) - 1] break print(f" Enter a number from 1 to {len(archives)}.") # ── Migration scope ───────────────────────────────────────────────────── print(f"\n {_bold('What to migrate?')}") print(f" {_cyan('1.')} SMB shares") print(f" {_cyan('2.')} NFS shares") print(f" {_cyan('3.')} iSCSI (targets, extents, portals, initiator groups)") sel_raw = _prompt( "Selection (space-separated numbers, Enter for all)", default="1 2 3" ) _sel_map = {"1": "smb", "2": "nfs", "3": "iscsi"} migrate = [] for tok in sel_raw.split(): if tok in _sel_map and _sel_map[tok] not in migrate: migrate.append(_sel_map[tok]) if not migrate: migrate = ["smb", "nfs", "iscsi"] # ── Parse archive ─────────────────────────────────────────────────────── print() archive_data = parse_archive(str(chosen)) extra_ns = {"debug_tar": str(chosen)} # ── iSCSI portal IP remapping ──────────────────────────────────────── if "iscsi" in migrate and archive_data.get("iscsi", {}).get("portals"): _prompt_iscsi_portals(archive_data["iscsi"]) # ── iSCSI pre-migration check ──────────────────────────────────────── if "iscsi" in migrate: _prompt_clear_existing_iscsi(host, port, api_key) # ── Select individual shares (common) ────────────────────────────────────── if "smb" in migrate and archive_data["smb_shares"]: archive_data["smb_shares"] = _select_shares(archive_data["smb_shares"], "SMB") if "nfs" in migrate and archive_data["nfs_shares"]: archive_data["nfs_shares"] = _select_shares(archive_data["nfs_shares"], "NFS") print() base_ns = dict( dest=host, port=port, api_key=api_key, verify_ssl=False, migrate=migrate, **extra_ns, ) # 6 ── Dry run ────────────────────────────────────────────────────────────── dry_summary = asyncio.run( run(argparse.Namespace(**base_ns, dry_run=True), archive_data) ) print(dry_summary.report()) # Offer to create missing datasets before the live run if dry_summary.missing_datasets: non_mnt = [p for p in dry_summary.missing_datasets if not p.startswith("/mnt/")] creatable = [p for p in dry_summary.missing_datasets if p.startswith("/mnt/")] if non_mnt: print(f" NOTE: {len(non_mnt)} path(s) cannot be auto-created " "(not under /mnt/):") for p in non_mnt: print(f" • {p}") print() if creatable: print(f" {len(creatable)} dataset(s) can be created automatically:") for p in creatable: print(f" • {p}") print() if _confirm(f"Create these {len(creatable)} dataset(s) on {host} now?"): asyncio.run(create_missing_datasets( host=host, port=port, api_key=api_key, paths=creatable, )) print() if dry_summary.missing_zvols: print(f"\n {len(dry_summary.missing_zvols)} zvol(s) need to be created for iSCSI extents:") for z in dry_summary.missing_zvols: print(f" • {z}") print() if _confirm(f"Create these {len(dry_summary.missing_zvols)} zvol(s) on {host} now?"): zvol_sizes: dict[str, int] = {} for zvol in dry_summary.missing_zvols: while True: raw = _prompt(f" Size for {zvol} (e.g. 100G, 500GiB, 1T)").strip() if not raw: print(" Size is required.") continue try: zvol_sizes[zvol] = _parse_size(raw) break except ValueError: print(f" Cannot parse {raw!r} — try a format like 100G or 500GiB.") asyncio.run(create_missing_zvols( host=host, port=port, api_key=api_key, zvols=zvol_sizes, )) print() print(f" Re-running dry run to verify zvol creation …") print() dry_summary = asyncio.run( run(argparse.Namespace(**base_ns, dry_run=True), archive_data) ) print(dry_summary.report()) if not _confirm(f"Apply these changes to {host}?"): print("Aborted – no changes made.") sys.exit(0) # 7 ── Live run ───────────────────────────────────────────────────────────── print() live_summary = asyncio.run( run(argparse.Namespace(**base_ns, dry_run=False), archive_data) ) print(live_summary.report()) if live_summary.errors: sys.exit(2) # ───────────────────────────────────────────────────────────────────────────── # Argument parser + entry point # ───────────────────────────────────────────────────────────────────────────── def main() -> None: if len(sys.argv) == 1: interactive_mode() return p = argparse.ArgumentParser( prog="truenas_migrate", description=( "Migrate SMB and NFS shares to a live TrueNAS destination system. " "Source can be a TrueNAS debug archive or customer-supplied CSV files." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) # ── Source ──────────────────────────────────────────────────────────────── src = p.add_argument_group("source (choose one)") src.add_argument( "--debug-tar", metavar="FILE", help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.", ) src.add_argument( "--smb-csv", metavar="FILE", help="Path to a CSV file containing SMB share definitions (non-TrueNAS source).", ) src.add_argument( "--nfs-csv", metavar="FILE", help="Path to a CSV file containing NFS share definitions (non-TrueNAS source).", ) p.add_argument( "--list-archive", action="store_true", help=( "List all JSON files found in the archive and exit. " "Requires --debug-tar." ), ) # ── Destination ─────────────────────────────────────────────────────────── p.add_argument( "--dest", metavar="HOST", help="Hostname or IP of the DESTINATION TrueNAS system.", ) p.add_argument( "--port", type=int, default=443, metavar="PORT", help="WebSocket port on the destination (default: 443).", ) p.add_argument( "--verify-ssl", action="store_true", help=( "Verify the destination TLS certificate. " "Off by default because most TrueNAS systems use self-signed certs." ), ) # ── Authentication ──────────────────────────────────────────────────────── p.add_argument( "--api-key", metavar="KEY", help=( "TrueNAS API key. Generate one in TrueNAS UI: " "top-right account menu → API Keys." ), ) # ── Scope ───────────────────────────────────────────────────────────────── p.add_argument( "--migrate", nargs="+", choices=["smb", "nfs", "iscsi"], default=["smb", "nfs", "iscsi"], metavar="TYPE", help=( "What to migrate. Choices: smb nfs iscsi " "(default: both). Example: --migrate smb" ), ) p.add_argument( "--dry-run", action="store_true", help="Parse source and connect to destination, but make no changes.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable DEBUG-level logging.", ) args = p.parse_args() if args.verbose: log.setLevel(logging.DEBUG) has_archive = bool(args.debug_tar) has_csv = bool(args.smb_csv or args.nfs_csv) if has_archive and has_csv: p.error("Cannot combine --debug-tar with --smb-csv / --nfs-csv.") if not has_archive and not has_csv: p.error( "Specify a source: --debug-tar FILE or --smb-csv / --nfs-csv FILE(s)." ) if has_archive: if not Path(args.debug_tar).is_file(): p.error(f"Archive not found: {args.debug_tar}") if args.list_archive: list_archive_and_exit(args.debug_tar) # does not return else: if args.list_archive: p.error("--list-archive requires --debug-tar.") if args.smb_csv and not Path(args.smb_csv).is_file(): p.error(f"SMB CSV not found: {args.smb_csv}") if args.nfs_csv and not Path(args.nfs_csv).is_file(): p.error(f"NFS CSV not found: {args.nfs_csv}") if not args.dest: p.error("--dest is required.") if not args.api_key: p.error("--api-key is required.") summary = asyncio.run(run(args)) print(summary.report()) if summary.errors: sys.exit(2)