#!/usr/bin/env python3 """ truenas_migrate.py – TrueNAS Share Migration Tool ===================================================== Reads SMB shares, NFS shares, and SMB global config from a TrueNAS debug archive (.tar / .tgz) produced by the built-in "Save Debug" feature, then re-creates them on a destination TrueNAS system via the JSON-RPC 2.0 WebSocket API (TrueNAS 25.04+). SAFE BY DEFAULT • Existing shares are never overwritten or deleted. • Always run with --dry-run first to preview what will happen. REQUIREMENTS pip install websockets Python 3.9+ QUICK START # 1. Inspect your debug archive to confirm it contains the data you need: python truenas_migrate.py --debug-tar debug.tgz --list-archive # 2. Dry-run – connect to destination but make zero changes: python truenas_migrate.py \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --dry-run # 3. Live migration of all three data types: python truenas_migrate.py \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" # 4. Migrate only SMB shares (skip NFS and global config): python truenas_migrate.py \\ --debug-tar debug.tgz \\ --dest 192.168.1.50 \\ --api-key "1-xxxxxxxxxxxx" \\ --migrate smb CONFLICT POLICY Shares that already exist on the destination are silently skipped: SMB – matched by share name (case-insensitive) NFS – matched by export path (exact match) SMB global config is always applied unless --migrate excludes "smb-config". """ from __future__ import annotations import argparse import asyncio import json import logging import ssl import sys import tarfile from dataclasses import dataclass, field from pathlib import Path from typing import Any, Optional # ── Optional dependency check ───────────────────────────────────────────────── try: import websockets from websockets.exceptions import WebSocketException except ImportError: sys.exit( "ERROR: The 'websockets' package is required.\n" "Install it with: pip install websockets" ) # ───────────────────────────────────────────────────────────────────────────── # Logging # ───────────────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("truenas_migrate") # ───────────────────────────────────────────────────────────────────────────── # Summary # ───────────────────────────────────────────────────────────────────────────── @dataclass class Summary: smb_found: int = 0 smb_created: int = 0 smb_skipped: int = 0 smb_failed: int = 0 nfs_found: int = 0 nfs_created: int = 0 nfs_skipped: int = 0 nfs_failed: int = 0 cfg_applied: bool = False errors: list[str] = field(default_factory=list) def report(self) -> str: w = 52 hr = "─" * w def row(label: str, val: str) -> str: right = w - 2 - len(label) - len(val) return f"│ {label}{val}{' ' * right} │" smb_val = (f"found={self.smb_found} created={self.smb_created}" f" skipped={self.smb_skipped} failed={self.smb_failed}") nfs_val = (f"found={self.nfs_found} created={self.nfs_created}" f" skipped={self.nfs_skipped} failed={self.nfs_failed}") cfg_val = "applied" if self.cfg_applied else "not applied" lines = [ "", f"┌{hr}┐", f"│{'MIGRATION SUMMARY':^{w}}│", f"├{hr}┤", row(" SMB shares : ", smb_val), row(" NFS shares : ", nfs_val), row(" SMB config : ", cfg_val), f"└{hr}┘", ] if self.errors: lines.append(f"\n {len(self.errors)} error(s):") for e in self.errors: lines.append(f" • {e}") lines.append("") return "\n".join(lines) # ───────────────────────────────────────────────────────────────────────────── # Debug archive parser # ───────────────────────────────────────────────────────────────────────────── # # TrueNAS generates debug archives with its built-in "ixdiagnose" tool (SCALE # 24.04+) or the older "freenas-debug" tool (CORE / earlier SCALE). # Neither has a fully stable internal layout across versions, so we try a # ranked list of known paths and fall back to a keyword heuristic scan. # # Known ixdiagnose (SCALE) layouts observed in the wild: # ixdiagnose/plugins/SMB/sharing.smb.query.json – SMB shares # ixdiagnose/plugins/SMB/smb.config.json – SMB global config # ixdiagnose/plugins/NFS/sharing.nfs.query.json – NFS shares # ixdiagnose/plugins/Sharing/sharing.smb.query.json # ixdiagnose/plugins/Sharing/sharing.nfs.query.json # # Known freenas-debug (CORE) layouts: # freenas-debug/sharing/smb.json – SMB shares # freenas-debug/sharing/nfs.json – NFS shares # (CORE SMB config is plain text; JSON form may not exist) _CANDIDATES: dict[str, list[str]] = { "smb_shares": [ "ixdiagnose/plugins/SMB/sharing.smb.query.json", "ixdiagnose/plugins/Sharing/sharing.smb.query.json", "ixdiagnose/SMB/sharing.smb.query.json", "freenas-debug/sharing/smb.json", "sharing/smb.json", "middleware/sharing.smb.query.json", ], "nfs_shares": [ "ixdiagnose/plugins/NFS/sharing.nfs.query.json", "ixdiagnose/plugins/Sharing/sharing.nfs.query.json", "ixdiagnose/NFS/sharing.nfs.query.json", "freenas-debug/sharing/nfs.json", "sharing/nfs.json", "middleware/sharing.nfs.query.json", ], "smb_config": [ "ixdiagnose/plugins/SMB/smb.config.json", "ixdiagnose/SMB/smb.config.json", "freenas-debug/SMB/smb_config.json", "middleware/smb.config.json", ], } # Keyword fragments for heuristic fallback scan _KEYWORDS: dict[str, list[str]] = { "smb_shares": ["sharing.smb", "smb_share", "sharing/smb"], "nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs"], "smb_config": ["smb.config", "smb_config"], } def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]: """Return {normalised_path: TarInfo} for every member.""" return {m.name.lstrip("./"): m for m in tf.getmembers()} def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]: """Extract and JSON-parse one archive member. Returns None on any error.""" try: fh = tf.extractfile(info) if fh is None: return None raw = fh.read().decode("utf-8", errors="replace").strip() return json.loads(raw) if raw else None except Exception as exc: log.debug("Could not parse %s: %s", info.name, exc) return None def _find_data( tf: tarfile.TarFile, members: dict[str, tarfile.TarInfo], data_type: str, ) -> Optional[Any]: """Try candidate paths, then keyword heuristics. Return parsed JSON or None.""" # Pass 1 – exact / suffix match against known candidate paths for candidate in _CANDIDATES[data_type]: norm = candidate.lstrip("./") # Direct hit info = members.get(norm) if info is None: # Archive may have a date-stamped top-level directory for path, member in members.items(): if path == norm or path.endswith("/" + norm): info = member break if info is not None: result = _read_json(tf, info) if result is not None: log.info(" %-12s → %s", data_type, info.name) return result # Pass 2 – keyword heuristic scan over all .json members log.debug(" %s: candidates missed, scanning archive …", data_type) keywords = _KEYWORDS[data_type] for path in sorted(members): if not path.lower().endswith(".json"): continue if any(kw in path.lower() for kw in keywords): result = _read_json(tf, members[path]) if result is not None: log.info(" %-12s → %s (heuristic)", data_type, path) return result return None def parse_archive(tar_path: str) -> dict[str, Any]: """ Extract SMB shares, NFS shares, and SMB config from the debug archive. Returns: {"smb_shares": list, "nfs_shares": list, "smb_config": dict|None} """ log.info("Opening archive: %s", tar_path) result: dict[str, Any] = { "smb_shares": [], "nfs_shares": [], "smb_config": None, } try: with tarfile.open(tar_path, "r:*") as tf: members = _members_map(tf) log.info(" Archive contains %d total entries.", len(members)) for key in ("smb_shares", "nfs_shares", "smb_config"): data = _find_data(tf, members, key) if data is None: log.warning(" %-12s → NOT FOUND", key) continue if key in ("smb_shares", "nfs_shares"): if isinstance(data, list): result[key] = data elif isinstance(data, dict): # Some versions wrap the list: {"result": [...]} for v in data.values(): if isinstance(v, list): result[key] = v break else: result[key] = data if isinstance(data, dict) else None except (tarfile.TarError, OSError) as exc: log.error("Failed to open archive: %s", exc) sys.exit(1) log.info( "Parsed: %d SMB share(s), %d NFS share(s), SMB config=%s", len(result["smb_shares"]), len(result["nfs_shares"]), "found" if result["smb_config"] else "not found", ) return result def list_archive_and_exit(tar_path: str) -> None: """ Print a structured listing of all JSON files in the archive, then exit. Helps confirm the archive actually contains the data we need. """ print(f"\nJSON files in archive: {tar_path}\n") try: with tarfile.open(tar_path, "r:*") as tf: json_members = sorted( (m for m in tf.getmembers() if m.name.endswith(".json")), key=lambda m: m.name, ) if not json_members: print(" (no .json files found)") else: # Group by top-level directory for readability current_dir = "" for m in json_members: parts = m.name.lstrip("./").split("/") top = "/".join(parts[:-1]) if len(parts) > 1 else "" if top != current_dir: print(f"\n {top or '(root)'}/") current_dir = top print(f" {parts[-1]} ({m.size / 1024:.1f} KB)") except (tarfile.TarError, OSError) as exc: sys.exit(f"ERROR: {exc}") print() sys.exit(0) # ───────────────────────────────────────────────────────────────────────────── # Payload builders # ───────────────────────────────────────────────────────────────────────────── # Read-only / server-generated fields that must NOT be sent on create/update _SMB_SHARE_READONLY = frozenset({"id", "locked"}) _NFS_SHARE_READONLY = frozenset({"id", "locked"}) _SMB_CONFIG_READONLY = frozenset({"id", "server_sid"}) def _smb_share_payload(share: dict) -> dict: return {k: v for k, v in share.items() if k not in _SMB_SHARE_READONLY} def _nfs_share_payload(share: dict) -> dict: return {k: v for k, v in share.items() if k not in _NFS_SHARE_READONLY} def _smb_config_payload(config: dict) -> dict: return {k: v for k, v in config.items() if k not in _SMB_CONFIG_READONLY} # ───────────────────────────────────────────────────────────────────────────── # TrueNAS JSON-RPC 2.0 WebSocket client # ───────────────────────────────────────────────────────────────────────────── class TrueNASClient: """ Minimal async JSON-RPC 2.0 client for the TrueNAS WebSocket API. TrueNAS 25.04+ endpoint: wss://:/api/current Authentication: auth.login_with_api_key """ def __init__( self, host: str, api_key: str, port: int = 443, verify_ssl: bool = False, ) -> None: self._host = host self._port = port self._api_key = api_key self._verify_ssl = verify_ssl self._ws = None self._call_id = 0 @property def _url(self) -> str: return f"wss://{self._host}:{self._port}/api/current" async def __aenter__(self) -> "TrueNASClient": await self._connect() return self async def __aexit__(self, *_: Any) -> None: if self._ws: await self._ws.close() self._ws = None async def _connect(self) -> None: ctx = ssl.create_default_context() if not self._verify_ssl: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE log.info("Connecting to %s …", self._url) try: self._ws = await websockets.connect( self._url, ssl=ctx, ping_interval=20, ping_timeout=30, open_timeout=20, ) except (WebSocketException, OSError) as exc: log.error("Connection failed: %s", exc) raise log.info("Authenticating with API key …") result = await self.call("auth.login_with_api_key", [self._api_key]) if result is not True and result != "SUCCESS": raise PermissionError(f"Authentication rejected: {result!r}") log.info("Connected and authenticated.") async def call(self, method: str, params: Optional[list] = None) -> Any: """ Send one JSON-RPC request and return its result. Raises RuntimeError if the API returns an error. """ self._call_id += 1 req_id = self._call_id await self._ws.send(json.dumps({ "jsonrpc": "2.0", "id": req_id, "method": method, "params": params or [], })) # Drain until the matching reply arrives (skip server-push notifications) while True: raw = await asyncio.wait_for(self._ws.recv(), timeout=60) msg = json.loads(raw) if "id" not in msg: # server-initiated notification continue if msg["id"] != req_id: # response to a different in-flight call continue if "error" in msg: err = msg["error"] reason = ( err.get("data", {}).get("reason") or err.get("message") or repr(err) ) raise RuntimeError(f"API error [{method}]: {reason}") return msg.get("result") # ───────────────────────────────────────────────────────────────────────────── # Migration routines # ───────────────────────────────────────────────────────────────────────────── async def migrate_smb_shares( client: TrueNASClient, shares: list[dict], dry_run: bool, summary: Summary, ) -> None: summary.smb_found = len(shares) if not shares: log.info("No SMB shares found in archive.") return log.info("Querying existing SMB shares on destination …") try: existing = await client.call("sharing.smb.query") or [] except RuntimeError as exc: msg = f"Could not query SMB shares: {exc}" log.error(msg) summary.errors.append(msg) return existing_names = {s.get("name", "").lower() for s in existing} log.info(" Destination has %d existing SMB share(s).", len(existing_names)) for share in shares: name = share.get("name", "") log.info("── SMB share %r", name) if name.lower() in existing_names: log.info(" SKIP – already exists on destination.") summary.smb_skipped += 1 continue payload = _smb_share_payload(share) log.debug(" payload: %s", json.dumps(payload)) if dry_run: log.info(" [DRY RUN] would create SMB share %r → %s", name, payload.get("path")) summary.smb_created += 1 continue try: r = await client.call("sharing.smb.create", [payload]) log.info(" CREATED id=%s", r.get("id")) summary.smb_created += 1 except RuntimeError as exc: log.error(" FAILED: %s", exc) summary.smb_failed += 1 summary.errors.append(f"SMB share {name!r}: {exc}") async def migrate_nfs_shares( client: TrueNASClient, shares: list[dict], dry_run: bool, summary: Summary, ) -> None: summary.nfs_found = len(shares) if not shares: log.info("No NFS shares found in archive.") return log.info("Querying existing NFS shares on destination …") try: existing = await client.call("sharing.nfs.query") or [] except RuntimeError as exc: msg = f"Could not query NFS shares: {exc}" log.error(msg) summary.errors.append(msg) return existing_paths = {s.get("path", "").rstrip("/") for s in existing} log.info(" Destination has %d existing NFS share(s).", len(existing_paths)) for share in shares: path = share.get("path", "").rstrip("/") log.info("── NFS export %r", path) if path in existing_paths: log.info(" SKIP – path already exported on destination.") summary.nfs_skipped += 1 continue payload = _nfs_share_payload(share) log.debug(" payload: %s", json.dumps(payload)) if dry_run: log.info(" [DRY RUN] would create NFS export for %r", path) summary.nfs_created += 1 continue try: r = await client.call("sharing.nfs.create", [payload]) log.info(" CREATED id=%s", r.get("id")) summary.nfs_created += 1 except RuntimeError as exc: log.error(" FAILED: %s", exc) summary.nfs_failed += 1 summary.errors.append(f"NFS share {path!r}: {exc}") async def migrate_smb_config( client: TrueNASClient, config: Optional[dict], dry_run: bool, summary: Summary, ) -> None: if not config: log.info("No SMB global config found in archive – skipping.") return payload = _smb_config_payload(config) log.info("── SMB global config") log.info( " netbiosname=%-20s workgroup=%-15s encryption=%s", repr(payload.get("netbiosname")), repr(payload.get("workgroup")), repr(payload.get("encryption")), ) if dry_run: log.info(" [DRY RUN] would call smb.update") summary.cfg_applied = True return try: await client.call("smb.update", [payload]) log.info(" APPLIED") summary.cfg_applied = True except RuntimeError as exc: log.error(" FAILED: %s", exc) summary.errors.append(f"SMB config: {exc}") # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── async def run(args: argparse.Namespace) -> None: archive = parse_archive(args.debug_tar) migrate_set = set(args.migrate) if args.dry_run: log.info("=" * 55) log.info("DRY RUN – no changes will be made on the destination") log.info("=" * 55) summary = Summary() async with TrueNASClient( host=args.dest, port=args.port, api_key=args.api_key, verify_ssl=args.verify_ssl, ) as client: if "smb" in migrate_set: await migrate_smb_shares( client, archive["smb_shares"], args.dry_run, summary) if "nfs" in migrate_set: await migrate_nfs_shares( client, archive["nfs_shares"], args.dry_run, summary) if "smb-config" in migrate_set: await migrate_smb_config( client, archive["smb_config"], args.dry_run, summary) print(summary.report()) if summary.errors: sys.exit(2) def main() -> None: p = argparse.ArgumentParser( prog="truenas_migrate.py", description=( "Migrate SMB shares, NFS shares, and SMB global config " "from a TrueNAS debug archive to a live destination system." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) # ── Source ──────────────────────────────────────────────────────────────── p.add_argument( "--debug-tar", required=True, metavar="FILE", help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.", ) p.add_argument( "--list-archive", action="store_true", help=( "List all JSON files found in the archive and exit. " "Run this first to verify the archive contains share data." ), ) # ── Destination ─────────────────────────────────────────────────────────── p.add_argument( "--dest", metavar="HOST", help="Hostname or IP of the DESTINATION TrueNAS system.", ) p.add_argument( "--port", type=int, default=443, metavar="PORT", help="WebSocket port on the destination (default: 443).", ) p.add_argument( "--verify-ssl", action="store_true", help=( "Verify the destination TLS certificate. " "Off by default because most TrueNAS systems use self-signed certs." ), ) # ── Authentication ──────────────────────────────────────────────────────── p.add_argument( "--api-key", metavar="KEY", help=( "TrueNAS API key. Generate one in TrueNAS UI: " "top-right account menu → API Keys." ), ) # ── Scope ───────────────────────────────────────────────────────────────── p.add_argument( "--migrate", nargs="+", choices=["smb", "nfs", "smb-config"], default=["smb", "nfs", "smb-config"], metavar="TYPE", help=( "What to migrate. Choices: smb nfs smb-config " "(default: all three). Example: --migrate smb nfs" ), ) p.add_argument( "--dry-run", action="store_true", help="Parse archive and connect to destination, but make no changes.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable DEBUG-level logging.", ) args = p.parse_args() if args.verbose: log.setLevel(logging.DEBUG) if not Path(args.debug_tar).is_file(): p.error(f"Archive not found: {args.debug_tar}") if args.list_archive: list_archive_and_exit(args.debug_tar) # does not return if not args.dest: p.error("--dest is required (or use --list-archive to inspect the archive).") if not args.api_key: p.error("--api-key is required.") asyncio.run(run(args)) if __name__ == "__main__": main()