""" truenas_migrate – TrueNAS Share Migration Tool ================================================= Reads SMB shares and NFS shares from either a TrueNAS debug archive (.tar / .tgz) or customer-supplied CSV files, then re-creates them on a destination TrueNAS system via the JSON-RPC 2.0 WebSocket API (TrueNAS 25.04+). SAFE BY DEFAULT • Existing shares are never overwritten or deleted. • Always run with --dry-run first to preview what will happen. REQUIREMENTS Python 3.9+ (stdlib only – no external packages needed) QUICK START — Archive source # 1. Inspect your debug archive to confirm it contains the data you need: python -m truenas_migrate --debug-tar debug.tgz --list-archive # 2. Dry-run – connect to destination but make zero changes: python -m truenas_migrate \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --dry-run # 3. Live migration: python -m truenas_migrate \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" QUICK START — CSV source # Fill in smb_shares_template.csv / nfs_shares_template.csv, then: python -m truenas_migrate \\ --smb-csv smb_shares.csv \\ --nfs-csv nfs_shares.csv \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --dry-run CONFLICT POLICY Shares that already exist on the destination are silently skipped: SMB – matched by share name (case-insensitive) NFS – matched by export path (exact match) """ from __future__ import annotations import argparse import asyncio import getpass import logging import sys from pathlib import Path from typing import Optional from .archive import parse_archive, list_archive_and_exit from .client import TrueNASClient, check_dataset_paths, create_missing_datasets, check_iscsi_zvols, create_missing_zvols from .colors import log, _bold, _bold_cyan, _bold_red, _bold_yellow, _cyan, _dim, _green, _yellow from .csv_source import parse_csv_sources from .migrate import migrate_smb_shares, migrate_nfs_shares, migrate_iscsi from .summary import Summary # ───────────────────────────────────────────────────────────────────────────── # CLI orchestration # ───────────────────────────────────────────────────────────────────────────── async def run( args: argparse.Namespace, archive: Optional[dict] = None, ) -> Summary: if archive is None: smb_csv = getattr(args, "smb_csv", None) nfs_csv = getattr(args, "nfs_csv", None) if smb_csv or nfs_csv: archive = parse_csv_sources(smb_csv, nfs_csv) else: archive = parse_archive(args.debug_tar) migrate_set = set(args.migrate) if args.dry_run: msg = " DRY RUN – no changes will be made on the destination " bar = _bold_yellow("─" * len(msg)) print(f"\n{_bold_yellow('┌')}{bar}{_bold_yellow('┐')}", file=sys.stderr) print(f"{_bold_yellow('│')}{_bold_yellow(msg)}{_bold_yellow('│')}", file=sys.stderr) print(f"{_bold_yellow('└')}{bar}{_bold_yellow('┘')}\n", file=sys.stderr) summary = Summary() async with TrueNASClient( host=args.dest, port=args.port, api_key=args.api_key, verify_ssl=args.verify_ssl, ) as client: if "smb" in migrate_set: await migrate_smb_shares( client, archive["smb_shares"], args.dry_run, summary) if "nfs" in migrate_set: await migrate_nfs_shares( client, archive["nfs_shares"], args.dry_run, summary) if "iscsi" in migrate_set: await migrate_iscsi( client, archive.get("iscsi", {}), args.dry_run, summary) if args.dry_run and summary.paths_to_create: summary.missing_datasets = await check_dataset_paths( client, summary.paths_to_create, ) if args.dry_run and summary.zvols_to_check: summary.missing_zvols = await check_iscsi_zvols( client, summary.zvols_to_check, ) return summary # ───────────────────────────────────────────────────────────────────────────── # Interactive wizard helpers # ───────────────────────────────────────────────────────────────────────────── def _parse_size(s: str) -> int: """Parse a human-friendly size string to bytes. E.g. '100G', '500GiB', '1T'.""" s = s.strip().upper() for suffix, mult in [ ("PIB", 1 << 50), ("PB", 1 << 50), ("P", 1 << 50), ("TIB", 1 << 40), ("TB", 1 << 40), ("T", 1 << 40), ("GIB", 1 << 30), ("GB", 1 << 30), ("G", 1 << 30), ("MIB", 1 << 20), ("MB", 1 << 20), ("M", 1 << 20), ("KIB", 1 << 10), ("KB", 1 << 10), ("K", 1 << 10), ]: if s.endswith(suffix): try: return int(float(s[:-len(suffix)]) * mult) except ValueError: pass return int(s) # plain bytes def _find_debug_archives(directory: str = ".") -> list[Path]: """Return sorted list of TrueNAS debug archives found in *directory*.""" patterns = ("*.tgz", "*.tar.gz", "*.tar", "*.txz", "*.tar.xz") found: set[Path] = set() for pat in patterns: found.update(Path(directory).glob(pat)) return sorted(found) def _prompt(label: str, default: str = "") -> str: suffix = f" [{default}]" if default else "" try: val = input(f"{label}{suffix}: ").strip() return val if val else default except (EOFError, KeyboardInterrupt): print() sys.exit(0) def _confirm(label: str) -> bool: try: return input(f"{label} [y/N]: ").strip().lower() in ("y", "yes") except (EOFError, KeyboardInterrupt): print() return False def _prompt_csv_path(share_type: str) -> Optional[str]: """Prompt for a CSV file path. Returns resolved path string or None if skipped.""" template = f"{share_type.lower()}_shares_template.csv" print(f" {_dim('(template: ' + template + ')')}") while True: raw = _prompt(f" {share_type} shares CSV path (Enter to skip)") if not raw: return None p = Path(raw) if p.is_file(): return str(p) print(f" {_bold_red('File not found:')} {raw}") def _prompt_iscsi_portals(iscsi: dict) -> None: """Walk each portal and prompt for destination IPs in-place.""" portals = iscsi.get("portals", []) if not portals: return print(f"\n {_bold('iSCSI Portal Configuration')}") print(f" {_dim('Portal IP addresses are unique per system and must be updated.')}") print(f" {_dim('For MPIO, enter multiple IPs separated by spaces.')}") for portal in portals: comment = portal.get("comment", "") listen = portal.get("listen", []) src_ips = " ".join(f"{l['ip']}:{l['port']}" for l in listen) default_port = listen[0]["port"] if listen else 3260 label = f"Portal {portal['id']}" + (f" ({comment!r})" if comment else "") print(f"\n {_bold(label)}") print(f" {_dim('Source IP(s):')} {src_ips}") raw = _prompt(" Destination IP(s)").strip() if not raw: print(f" {_yellow('⚠')} No IPs entered — keeping source IPs.") continue port_raw = _prompt(" Port", default=str(default_port)) port = int(port_raw) if port_raw.isdigit() else default_port dest_ips = raw.split() portal["listen"] = [{"ip": ip, "port": port} for ip in dest_ips] print(f" {_green('✓')} Portal: {', '.join(f'{ip}:{port}' for ip in dest_ips)}") print() def _select_shares(shares: list[dict], share_type: str) -> list[dict]: """ Display a numbered list of *shares* and return only those the user selects. Enter (or 'all') returns all shares unchanged. 'n' / 'none' returns []. """ if not shares: return shares print(f"\n {_bold(f'{share_type} shares ({len(shares)}):')} \n") for i, share in enumerate(shares, 1): if share_type == "SMB": name = share.get("name", "") path = share.get("path", "") print(f" {_cyan(str(i) + '.')} {name:<22} {_dim(path)}") else: # NFS pl = share.get("paths") or [] path = share.get("path") or (pl[0] if pl else "") extra = f" {_dim('+ ' + str(len(pl) - 1) + ' more')}" if len(pl) > 1 else "" print(f" {_cyan(str(i) + '.')} {path}{extra}") print() raw = _prompt( f" Select {share_type} shares to migrate " "(e.g. '1 3', Enter = all, 'n' = none)", default="all", ) low = raw.strip().lower() if low in ("", "all"): print(f" {_green('✓')} All {len(shares)} {share_type} share(s) selected.") return shares if low in ("n", "none", "0"): print(f" {_yellow('–')} No {share_type} shares selected.") return [] seen: set[int] = set() selected: list[dict] = [] for tok in raw.split(): if tok.isdigit(): idx = int(tok) - 1 if 0 <= idx < len(shares) and idx not in seen: seen.add(idx) selected.append(shares[idx]) if selected: print(f" {_green('✓')} {len(selected)} of {len(shares)} {share_type} share(s) selected.") else: print(f" {_yellow('–')} No valid selections; skipping {share_type} shares.") return selected # ───────────────────────────────────────────────────────────────────────────── # Interactive wizard # ───────────────────────────────────────────────────────────────────────────── def interactive_mode() -> None: """Interactive wizard: pick source → configure → dry run → confirm → apply.""" print( f"\n{_bold_cyan(' TrueNAS Share Migration Tool')}\n" f" {_dim('Migrate SMB/NFS shares to a live TrueNAS system.')}\n" ) # 1 ── Source type ────────────────────────────────────────────────────────── print(f" {_bold('Source type:')}") print(f" {_cyan('1.')} TrueNAS debug archive (.tgz / .tar)") print(f" {_cyan('2.')} CSV import (non-TrueNAS source)") src_raw = _prompt(" Select source [1/2]", default="1") use_csv = src_raw.strip() == "2" print() # 2 ── Destination ────────────────────────────────────────────────────────── host = "" while not host: host = _prompt("Destination TrueNAS host or IP") if not host: print(" Host is required.") port_raw = _prompt("WebSocket port", default="443") port = int(port_raw) if port_raw.isdigit() else 443 # 3 ── API key ────────────────────────────────────────────────────────────── api_key = "" while not api_key: try: api_key = getpass.getpass("API key (input hidden): ").strip() except (EOFError, KeyboardInterrupt): print() sys.exit(0) if not api_key: print(" API key is required.") if use_csv: # ── CSV source ────────────────────────────────────────────────────────── print(f"\n {_bold('CSV file paths:')}") print(f" {_dim('Press Enter to skip a share type.')}\n") smb_csv_path = _prompt_csv_path("SMB") print() nfs_csv_path = _prompt_csv_path("NFS") migrate: list[str] = [] if smb_csv_path: migrate.append("smb") if nfs_csv_path: migrate.append("nfs") if not migrate: sys.exit("No CSV files provided – nothing to migrate.") print() archive_data = parse_csv_sources(smb_csv_path, nfs_csv_path) extra_ns: dict = {"smb_csv": smb_csv_path, "nfs_csv": nfs_csv_path} else: # ── Archive source ────────────────────────────────────────────────────── archives = _find_debug_archives() if not archives: sys.exit( "No debug archives (.tgz / .tar.gz / .tar / .txz) found in the " "current directory.\n" "Copy your TrueNAS debug file here, or use --debug-tar to specify a path." ) if len(archives) == 1: chosen = archives[0] print(f" {_dim('Archive:')} {_bold(chosen.name)} " f"{_dim('(' + f'{chosen.stat().st_size / 1_048_576:.1f} MB' + ')')}\n") else: print(f" {_bold('Debug archives found:')}\n") for i, p in enumerate(archives, 1): print(f" {_cyan(str(i) + '.')} {p.name} " f"{_dim('(' + f'{p.stat().st_size / 1_048_576:.1f} MB' + ')')}") print() while True: raw = _prompt(f"Select archive [1-{len(archives)}]") if raw.isdigit() and 1 <= int(raw) <= len(archives): chosen = archives[int(raw) - 1] break print(f" Enter a number from 1 to {len(archives)}.") # ── Migration scope ───────────────────────────────────────────────────── print(f"\n {_bold('What to migrate?')}") print(f" {_cyan('1.')} SMB shares") print(f" {_cyan('2.')} NFS shares") print(f" {_cyan('3.')} iSCSI (targets, extents, portals, initiator groups)") sel_raw = _prompt( "Selection (space-separated numbers, Enter for all)", default="1 2 3" ) _sel_map = {"1": "smb", "2": "nfs", "3": "iscsi"} migrate = [] for tok in sel_raw.split(): if tok in _sel_map and _sel_map[tok] not in migrate: migrate.append(_sel_map[tok]) if not migrate: migrate = ["smb", "nfs", "iscsi"] # ── Parse archive ─────────────────────────────────────────────────────── print() archive_data = parse_archive(str(chosen)) extra_ns = {"debug_tar": str(chosen)} # ── iSCSI portal IP remapping ──────────────────────────────────────── if "iscsi" in migrate and archive_data.get("iscsi", {}).get("portals"): _prompt_iscsi_portals(archive_data["iscsi"]) # ── Select individual shares (common) ────────────────────────────────────── if "smb" in migrate and archive_data["smb_shares"]: archive_data["smb_shares"] = _select_shares(archive_data["smb_shares"], "SMB") if "nfs" in migrate and archive_data["nfs_shares"]: archive_data["nfs_shares"] = _select_shares(archive_data["nfs_shares"], "NFS") print() base_ns = dict( dest=host, port=port, api_key=api_key, verify_ssl=False, migrate=migrate, **extra_ns, ) # 6 ── Dry run ────────────────────────────────────────────────────────────── dry_summary = asyncio.run( run(argparse.Namespace(**base_ns, dry_run=True), archive_data) ) print(dry_summary.report()) # Offer to create missing datasets before the live run if dry_summary.missing_datasets: non_mnt = [p for p in dry_summary.missing_datasets if not p.startswith("/mnt/")] creatable = [p for p in dry_summary.missing_datasets if p.startswith("/mnt/")] if non_mnt: print(f" NOTE: {len(non_mnt)} path(s) cannot be auto-created " "(not under /mnt/):") for p in non_mnt: print(f" • {p}") print() if creatable: print(f" {len(creatable)} dataset(s) can be created automatically:") for p in creatable: print(f" • {p}") print() if _confirm(f"Create these {len(creatable)} dataset(s) on {host} now?"): asyncio.run(create_missing_datasets( host=host, port=port, api_key=api_key, paths=creatable, )) print() if dry_summary.missing_zvols: print(f"\n {len(dry_summary.missing_zvols)} zvol(s) need to be created for iSCSI extents:") for z in dry_summary.missing_zvols: print(f" • {z}") print() if _confirm(f"Create these {len(dry_summary.missing_zvols)} zvol(s) on {host} now?"): zvol_sizes: dict[str, int] = {} for zvol in dry_summary.missing_zvols: while True: raw = _prompt(f" Size for {zvol} (e.g. 100G, 500GiB, 1T)").strip() if not raw: print(" Size is required.") continue try: zvol_sizes[zvol] = _parse_size(raw) break except ValueError: print(f" Cannot parse {raw!r} — try a format like 100G or 500GiB.") asyncio.run(create_missing_zvols( host=host, port=port, api_key=api_key, zvols=zvol_sizes, )) print() if not _confirm(f"Apply these changes to {host}?"): print("Aborted – no changes made.") sys.exit(0) # 7 ── Live run ───────────────────────────────────────────────────────────── print() live_summary = asyncio.run( run(argparse.Namespace(**base_ns, dry_run=False), archive_data) ) print(live_summary.report()) if live_summary.errors: sys.exit(2) # ───────────────────────────────────────────────────────────────────────────── # Argument parser + entry point # ───────────────────────────────────────────────────────────────────────────── def main() -> None: if len(sys.argv) == 1: interactive_mode() return p = argparse.ArgumentParser( prog="truenas_migrate", description=( "Migrate SMB and NFS shares to a live TrueNAS destination system. " "Source can be a TrueNAS debug archive or customer-supplied CSV files." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) # ── Source ──────────────────────────────────────────────────────────────── src = p.add_argument_group("source (choose one)") src.add_argument( "--debug-tar", metavar="FILE", help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.", ) src.add_argument( "--smb-csv", metavar="FILE", help="Path to a CSV file containing SMB share definitions (non-TrueNAS source).", ) src.add_argument( "--nfs-csv", metavar="FILE", help="Path to a CSV file containing NFS share definitions (non-TrueNAS source).", ) p.add_argument( "--list-archive", action="store_true", help=( "List all JSON files found in the archive and exit. " "Requires --debug-tar." ), ) # ── Destination ─────────────────────────────────────────────────────────── p.add_argument( "--dest", metavar="HOST", help="Hostname or IP of the DESTINATION TrueNAS system.", ) p.add_argument( "--port", type=int, default=443, metavar="PORT", help="WebSocket port on the destination (default: 443).", ) p.add_argument( "--verify-ssl", action="store_true", help=( "Verify the destination TLS certificate. " "Off by default because most TrueNAS systems use self-signed certs." ), ) # ── Authentication ──────────────────────────────────────────────────────── p.add_argument( "--api-key", metavar="KEY", help=( "TrueNAS API key. Generate one in TrueNAS UI: " "top-right account menu → API Keys." ), ) # ── Scope ───────────────────────────────────────────────────────────────── p.add_argument( "--migrate", nargs="+", choices=["smb", "nfs", "iscsi"], default=["smb", "nfs", "iscsi"], metavar="TYPE", help=( "What to migrate. Choices: smb nfs iscsi " "(default: both). Example: --migrate smb" ), ) p.add_argument( "--dry-run", action="store_true", help="Parse source and connect to destination, but make no changes.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable DEBUG-level logging.", ) args = p.parse_args() if args.verbose: log.setLevel(logging.DEBUG) has_archive = bool(args.debug_tar) has_csv = bool(args.smb_csv or args.nfs_csv) if has_archive and has_csv: p.error("Cannot combine --debug-tar with --smb-csv / --nfs-csv.") if not has_archive and not has_csv: p.error( "Specify a source: --debug-tar FILE or --smb-csv / --nfs-csv FILE(s)." ) if has_archive: if not Path(args.debug_tar).is_file(): p.error(f"Archive not found: {args.debug_tar}") if args.list_archive: list_archive_and_exit(args.debug_tar) # does not return else: if args.list_archive: p.error("--list-archive requires --debug-tar.") if args.smb_csv and not Path(args.smb_csv).is_file(): p.error(f"SMB CSV not found: {args.smb_csv}") if args.nfs_csv and not Path(args.nfs_csv).is_file(): p.error(f"NFS CSV not found: {args.nfs_csv}") if not args.dest: p.error("--dest is required.") if not args.api_key: p.error("--api-key is required.") summary = asyncio.run(run(args)) print(summary.report()) if summary.errors: sys.exit(2)