Files
TrueMigration/truenas_migrate.py
2026-03-03 14:54:03 -05:00

920 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
truenas_migrate.py TrueNAS Share Migration Tool
=====================================================
Reads SMB shares, NFS shares, and SMB global config from a TrueNAS debug
archive (.tar / .tgz) produced by the built-in "Save Debug" feature, then
re-creates them on a destination TrueNAS system via the JSON-RPC 2.0
WebSocket API (TrueNAS 25.04+).
SAFE BY DEFAULT
• Existing shares are never overwritten or deleted.
• Always run with --dry-run first to preview what will happen.
REQUIREMENTS
pip install websockets
Python 3.9+
QUICK START
# 1. Inspect your debug archive to confirm it contains the data you need:
python truenas_migrate.py --debug-tar debug.tgz --list-archive
# 2. Dry-run connect to destination but make zero changes:
python truenas_migrate.py \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--dry-run
# 3. Live migration of all three data types:
python truenas_migrate.py \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx"
# 4. Migrate only SMB shares (skip NFS and global config):
python truenas_migrate.py \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--migrate smb
CONFLICT POLICY
Shares that already exist on the destination are silently skipped:
SMB matched by share name (case-insensitive)
NFS matched by export path (exact match)
SMB global config is always applied unless --migrate excludes "smb-config".
"""
from __future__ import annotations
import argparse
import asyncio
import contextlib
import json
import logging
import ssl
import sys
import tarfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
# ── Optional dependency check ─────────────────────────────────────────────────
try:
import websockets
from websockets.exceptions import WebSocketException
except ImportError:
sys.exit(
"ERROR: The 'websockets' package is required.\n"
"Install it with: pip install websockets"
)
# ─────────────────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("truenas_migrate")
# ─────────────────────────────────────────────────────────────────────────────
# Summary
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Summary:
smb_found: int = 0
smb_created: int = 0
smb_skipped: int = 0
smb_failed: int = 0
nfs_found: int = 0
nfs_created: int = 0
nfs_skipped: int = 0
nfs_failed: int = 0
cfg_applied: bool = False
errors: list[str] = field(default_factory=list)
def report(self) -> str:
w = 52
hr = "" * w
def row(label: str, val: str) -> str:
right = w - 2 - len(label) - len(val)
return f"{label}{val}{' ' * right}"
smb_val = (f"found={self.smb_found} created={self.smb_created}"
f" skipped={self.smb_skipped} failed={self.smb_failed}")
nfs_val = (f"found={self.nfs_found} created={self.nfs_created}"
f" skipped={self.nfs_skipped} failed={self.nfs_failed}")
cfg_val = "applied" if self.cfg_applied else "not applied"
lines = [
"",
f"{hr}",
f"{'MIGRATION SUMMARY':^{w}}",
f"{hr}",
row(" SMB shares : ", smb_val),
row(" NFS shares : ", nfs_val),
row(" SMB config : ", cfg_val),
f"{hr}",
]
if self.errors:
lines.append(f"\n {len(self.errors)} error(s):")
for e in self.errors:
lines.append(f"{e}")
lines.append("")
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# Debug archive parser
# ─────────────────────────────────────────────────────────────────────────────
#
# TrueNAS SCALE generates debug archives with the "ixdiagnose" tool.
# The internal layout has changed across versions:
#
# SCALE 24.04+ (plugins layout, lowercase dirs, combined JSON files)
# ixdiagnose/plugins/smb/smb_info.json SMB shares + config combined
# ixdiagnose/plugins/nfs/nfs_config.json NFS shares + config combined
#
# Older SCALE (plugins layout, uppercase dirs, per-query JSON files)
# ixdiagnose/plugins/SMB/sharing.smb.query.json
# ixdiagnose/plugins/SMB/smb.config.json
# ixdiagnose/plugins/NFS/sharing.nfs.query.json
# ixdiagnose/plugins/Sharing/sharing.smb.query.json
# ixdiagnose/plugins/Sharing/sharing.nfs.query.json
#
# TrueNAS CORE uses the "freenas-debug" tool (stored as "fndebug" inside the
# archive). It produces only plain-text dump files there is NO JSON share
# data in CORE debug archives. The script detects CORE archives early and
# exits with a clear message rather than silently returning empty results.
_CANDIDATES: dict[str, list[str]] = {
"smb_shares": [
# SCALE 24.04+ combined plugin file; shares are under "sharing_smb_query"
"ixdiagnose/plugins/smb/smb_info.json",
# Older SCALE uppercase plugin dirs, per-query files
"ixdiagnose/plugins/SMB/sharing.smb.query.json",
"ixdiagnose/plugins/Sharing/sharing.smb.query.json",
"ixdiagnose/SMB/sharing.smb.query.json",
],
"nfs_shares": [
# SCALE 24.04+ combined plugin file; shares are under "sharing_nfs_query"
"ixdiagnose/plugins/nfs/nfs_config.json",
# Older SCALE uppercase plugin dirs, per-query files
"ixdiagnose/plugins/NFS/sharing.nfs.query.json",
"ixdiagnose/plugins/Sharing/sharing.nfs.query.json",
"ixdiagnose/NFS/sharing.nfs.query.json",
],
"smb_config": [
# SCALE 24.04+ combined plugin file; config is under "smb_config"
"ixdiagnose/plugins/smb/smb_info.json",
# Older SCALE uppercase plugin dirs
"ixdiagnose/plugins/SMB/smb.config.json",
"ixdiagnose/SMB/smb.config.json",
],
}
# When a candidate file bundles multiple datasets, pull out the right sub-key.
_KEY_WITHIN_FILE: dict[str, str] = {
"smb_shares": "sharing_smb_query",
"nfs_shares": "sharing_nfs_query",
"smb_config": "smb_config",
}
# Keyword fragments for heuristic fallback scan (SCALE archives only)
_KEYWORDS: dict[str, list[str]] = {
"smb_shares": ["sharing.smb", "smb_share", "sharing/smb", "smb_info"],
"nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs", "nfs_config"],
"smb_config": ["smb.config", "smb_config", "smb_info"],
}
# Presence of this path prefix identifies a TrueNAS CORE archive (fndebug /
# freenas-debug). CORE stores diagnostics as plain-text dump files, but each
# dump embeds JSON blocks that we can extract.
_CORE_MARKER = "ixdiagnose/fndebug"
# CORE SMB config fields that do not exist in the SCALE API and must be
# stripped before calling smb.update on the destination.
_SMB_CONFIG_CORE_EXTRAS = frozenset({
"cifs_SID", # renamed to server_sid in SCALE (already stripped)
"loglevel", # removed in SCALE
"netbiosname_b", # HA node-B hostname; not applicable in SCALE
"netbiosname_local",# HA active-node field; not applicable in SCALE
"next_rid", # internal RID counter; not settable via API
})
def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]:
"""Return {normalised_path: TarInfo} for every member."""
return {m.name.lstrip("./"): m for m in tf.getmembers()}
def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]:
"""Extract and JSON-parse one archive member. Returns None on any error."""
try:
fh = tf.extractfile(info)
if fh is None:
return None
raw = fh.read().decode("utf-8", errors="replace").strip()
return json.loads(raw) if raw else None
except Exception as exc:
log.debug("Could not parse %s: %s", info.name, exc)
return None
def _extract_subkey(raw: Any, data_type: str) -> Optional[Any]:
"""
When a JSON file bundles multiple datasets, pull out the sub-key that
corresponds to data_type (e.g. "sharing_smb_query" from smb_info.json).
Falls back to the raw value when no sub-key mapping exists.
"""
if not isinstance(raw, dict):
return raw
key = _KEY_WITHIN_FILE.get(data_type)
if key and key in raw:
return raw[key]
return raw
def _find_data(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
data_type: str,
) -> Optional[Any]:
"""Try candidate paths, then keyword heuristics. Return parsed JSON or None."""
# Pass 1 exact / suffix match against known candidate paths
for candidate in _CANDIDATES[data_type]:
norm = candidate.lstrip("./")
# Direct hit
info = members.get(norm)
if info is None:
# Archive may have a date-stamped top-level directory
for path, member in members.items():
if path == norm or path.endswith("/" + norm):
info = member
break
if info is not None:
raw = _read_json(tf, info)
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s", data_type, info.name)
return result
# Pass 2 keyword heuristic scan over all .json members
log.debug(" %s: candidates missed, scanning archive …", data_type)
keywords = _KEYWORDS[data_type]
for path in sorted(members):
if not path.lower().endswith(".json"):
continue
if any(kw in path.lower() for kw in keywords):
raw = _read_json(tf, members[path])
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s (heuristic)", data_type, path)
return result
return None
def _extract_core_dump_json(dump_text: str, title_fragment: str) -> list[Any]:
"""
Extract all top-level JSON values from a named section of a CORE dump.txt.
CORE dump sections look like:
+--------...--------+
+ SECTION TITLE + ← title line (contains the section name)
+--------...--------+
<content may include JSON object(s) and/or array(s)>
debug finished in N seconds for SECTION TITLE
Returns a list of parsed JSON values found in the content block, in order.
An empty list is returned when the section is not found or contains no JSON.
"""
import re as _re
# Split on the horizontal rule lines
parts = _re.split(r'\+[-]{20,}\+', dump_text)
for i, part in enumerate(parts):
if title_fragment.lower() in part.lower() and i + 1 < len(parts):
content = parts[i + 1]
# Trim the "debug finished …" trailer and surrounding whitespace
content = _re.sub(
r'debug finished.*', '', content,
flags=_re.IGNORECASE | _re.DOTALL,
).strip()
# Greedily parse consecutive JSON values from the content
results: list[Any] = []
decoder = json.JSONDecoder()
pos = 0
while pos < len(content):
remaining = content[pos:].lstrip()
if not remaining or remaining[0] not in "{[":
break
pos += len(content[pos:]) - len(remaining) # account for whitespace
try:
val, end = decoder.raw_decode(remaining)
results.append(val)
pos += end
except json.JSONDecodeError:
break
return results
return []
def _parse_core_into(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
result: dict[str, Any],
) -> None:
"""
Populate *result* from TrueNAS CORE fndebug dump files.
SMB dump (ixdiagnose/fndebug/SMB/dump.txt)
"Database Dump" section → JSON object (global config) + JSON array (shares)
NFS dump (ixdiagnose/fndebug/NFS/dump.txt)
"Configuration" section → JSON object (global config) + JSON array (shares)
"""
log.info("TrueNAS CORE archive detected; parsing fndebug dump files.")
smb_key = "ixdiagnose/fndebug/SMB/dump.txt"
if smb_key in members:
fh = tf.extractfile(members[smb_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Database Dump")
if vals and isinstance(vals[0], dict):
result["smb_config"] = vals[0]
log.info(" smb_config → %s (CORE)", smb_key)
if len(vals) >= 2 and isinstance(vals[1], list):
result["smb_shares"] = vals[1]
log.info(" smb_shares → %s (CORE, %d share(s))", smb_key, len(vals[1]))
elif result["smb_config"] is not None:
log.warning(" smb_shares → NOT FOUND in Database Dump")
else:
log.warning(" SMB dump not found: %s", smb_key)
nfs_key = "ixdiagnose/fndebug/NFS/dump.txt"
if nfs_key in members:
fh = tf.extractfile(members[nfs_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Configuration")
if len(vals) >= 2 and isinstance(vals[1], list):
result["nfs_shares"] = vals[1]
log.info(" nfs_shares → %s (CORE, %d share(s))", nfs_key, len(vals[1]))
else:
log.warning(" nfs_shares → NOT FOUND in Configuration")
else:
log.warning(" NFS dump not found: %s", nfs_key)
if not result["smb_shares"] and not result["nfs_shares"]:
log.warning(
"No share data found in CORE archive. "
"This is expected when SMB/NFS services were disabled on the source system."
)
@contextlib.contextmanager
def _open_source_tar(tar_path: str):
"""
Open the archive that actually contains the ixdiagnose data.
TrueNAS HA debug bundles (25.04+) wrap each node's ixdiagnose snapshot
in a separate .txz inside the outer .tgz. We prefer the member whose
name includes '_active'; if none is labelled that way we fall back to the
first .txz found. Single-node (non-HA) bundles are used directly.
"""
with tarfile.open(tar_path, "r:*") as outer:
txz_members = [
m for m in outer.getmembers()
if m.name.lower().endswith(".txz") and m.isfile()
]
if not txz_members:
yield outer
return
# HA bundle pick the active node's inner archive
active = next(
(m for m in txz_members if "_active" in m.name.lower()),
txz_members[0],
)
log.info(" HA bundle detected; reading inner archive: %s", active.name)
fh = outer.extractfile(active)
with tarfile.open(fileobj=fh, mode="r:*") as inner:
yield inner
def parse_archive(tar_path: str) -> dict[str, Any]:
"""
Extract SMB shares, NFS shares, and SMB config from the debug archive.
Returns: {"smb_shares": list, "nfs_shares": list, "smb_config": dict|None}
"""
log.info("Opening archive: %s", tar_path)
result: dict[str, Any] = {
"smb_shares": [],
"nfs_shares": [],
"smb_config": None,
}
try:
with _open_source_tar(tar_path) as tf:
members = _members_map(tf)
log.info(" Archive contains %d total entries.", len(members))
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members
)
if is_core:
_parse_core_into(tf, members, result)
else:
for key in ("smb_shares", "nfs_shares", "smb_config"):
data = _find_data(tf, members, key)
if data is None:
log.warning(" %-12s → NOT FOUND", key)
continue
if key in ("smb_shares", "nfs_shares"):
if isinstance(data, list):
result[key] = data
elif isinstance(data, dict):
# Some versions wrap the list: {"result": [...]}
for v in data.values():
if isinstance(v, list):
result[key] = v
break
else:
result[key] = data if isinstance(data, dict) else None
except (tarfile.TarError, OSError) as exc:
log.error("Failed to open archive: %s", exc)
sys.exit(1)
log.info(
"Parsed: %d SMB share(s), %d NFS share(s), SMB config=%s",
len(result["smb_shares"]),
len(result["nfs_shares"]),
"found" if result["smb_config"] else "not found",
)
return result
def list_archive_and_exit(tar_path: str) -> None:
"""
Print a structured listing of the archive contents, then exit.
For SCALE archives: lists all .json plugin files.
For CORE archives: lists the fndebug dump files and the JSON sections
that contain share / config data.
"""
try:
with _open_source_tar(tar_path) as tf:
members_map = _members_map(tf)
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members_map
)
if is_core:
print(f"\nTrueNAS CORE archive: {tar_path}\n")
print(" fndebug plain-text dump files (JSON is embedded inside):\n")
dump_files = sorted(
p for p in members_map
if p.startswith(_CORE_MARKER + "/") and p.endswith(".txt")
)
for p in dump_files:
size = members_map[p].size / 1024
print(f" {p} ({size:.1f} KB)")
print()
print(" Data this tool will extract:")
print(" SMB config + shares → fndebug/SMB/dump.txt "
"(\"Database Dump\" section)")
print(" NFS shares → fndebug/NFS/dump.txt "
"(\"Configuration\" section)")
else:
print(f"\nJSON plugin files in archive: {tar_path}\n")
json_members = sorted(
(m for m in tf.getmembers() if m.name.endswith(".json")),
key=lambda m: m.name,
)
if not json_members:
print(" (no .json files found)")
else:
current_dir = ""
for m in json_members:
parts = m.name.lstrip("./").split("/")
top = "/".join(parts[:-1]) if len(parts) > 1 else ""
if top != current_dir:
print(f"\n {top or '(root)'}/")
current_dir = top
print(f" {parts[-1]} ({m.size / 1024:.1f} KB)")
except (tarfile.TarError, OSError) as exc:
sys.exit(f"ERROR: {exc}")
print()
sys.exit(0)
# ─────────────────────────────────────────────────────────────────────────────
# Payload builders
# ─────────────────────────────────────────────────────────────────────────────
# Read-only / server-generated fields that must NOT be sent on create/update
_SMB_SHARE_READONLY = frozenset({"id", "locked"})
_NFS_SHARE_READONLY = frozenset({"id", "locked"})
_SMB_CONFIG_READONLY = frozenset({"id", "server_sid"})
def _smb_share_payload(share: dict) -> dict:
return {k: v for k, v in share.items() if k not in _SMB_SHARE_READONLY}
def _nfs_share_payload(share: dict) -> dict:
return {k: v for k, v in share.items() if k not in _NFS_SHARE_READONLY}
def _smb_config_payload(config: dict) -> dict:
exclude = _SMB_CONFIG_READONLY | _SMB_CONFIG_CORE_EXTRAS
return {k: v for k, v in config.items() if k not in exclude}
# ─────────────────────────────────────────────────────────────────────────────
# TrueNAS JSON-RPC 2.0 WebSocket client
# ─────────────────────────────────────────────────────────────────────────────
class TrueNASClient:
"""
Minimal async JSON-RPC 2.0 client for the TrueNAS WebSocket API.
TrueNAS 25.04+ endpoint: wss://<host>:<port>/api/current
Authentication: auth.login_with_api_key
"""
def __init__(
self,
host: str,
api_key: str,
port: int = 443,
verify_ssl: bool = False,
) -> None:
self._host = host
self._port = port
self._api_key = api_key
self._verify_ssl = verify_ssl
self._ws = None
self._call_id = 0
@property
def _url(self) -> str:
return f"wss://{self._host}:{self._port}/api/current"
async def __aenter__(self) -> "TrueNASClient":
await self._connect()
return self
async def __aexit__(self, *_: Any) -> None:
if self._ws:
await self._ws.close()
self._ws = None
async def _connect(self) -> None:
ctx = ssl.create_default_context()
if not self._verify_ssl:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
log.info("Connecting to %s", self._url)
try:
self._ws = await websockets.connect(
self._url,
ssl=ctx,
ping_interval=20,
ping_timeout=30,
open_timeout=20,
)
except (WebSocketException, OSError) as exc:
log.error("Connection failed: %s", exc)
raise
log.info("Authenticating with API key …")
result = await self.call("auth.login_with_api_key", [self._api_key])
if result is not True and result != "SUCCESS":
raise PermissionError(f"Authentication rejected: {result!r}")
log.info("Connected and authenticated.")
async def call(self, method: str, params: Optional[list] = None) -> Any:
"""
Send one JSON-RPC request and return its result.
Raises RuntimeError if the API returns an error.
"""
self._call_id += 1
req_id = self._call_id
await self._ws.send(json.dumps({
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params or [],
}))
# Drain until the matching reply arrives (skip server-push notifications)
while True:
raw = await asyncio.wait_for(self._ws.recv(), timeout=60)
msg = json.loads(raw)
if "id" not in msg: # server-initiated notification
continue
if msg["id"] != req_id: # response to a different in-flight call
continue
if "error" in msg:
err = msg["error"]
reason = (
err.get("data", {}).get("reason")
or err.get("message")
or repr(err)
)
raise RuntimeError(f"API error [{method}]: {reason}")
return msg.get("result")
# ─────────────────────────────────────────────────────────────────────────────
# Migration routines
# ─────────────────────────────────────────────────────────────────────────────
async def migrate_smb_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.smb_found = len(shares)
if not shares:
log.info("No SMB shares found in archive.")
return
log.info("Querying existing SMB shares on destination …")
try:
existing = await client.call("sharing.smb.query") or []
except RuntimeError as exc:
msg = f"Could not query SMB shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_names = {s.get("name", "").lower() for s in existing}
log.info(" Destination has %d existing SMB share(s).", len(existing_names))
for share in shares:
name = share.get("name", "<unnamed>")
log.info("── SMB share %r", name)
if name.lower() in existing_names:
log.info(" SKIP already exists on destination.")
summary.smb_skipped += 1
continue
payload = _smb_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" [DRY RUN] would create SMB share %r%s",
name, payload.get("path"))
summary.smb_created += 1
continue
try:
r = await client.call("sharing.smb.create", [payload])
log.info(" CREATED id=%s", r.get("id"))
summary.smb_created += 1
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
summary.smb_failed += 1
summary.errors.append(f"SMB share {name!r}: {exc}")
async def migrate_nfs_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.nfs_found = len(shares)
if not shares:
log.info("No NFS shares found in archive.")
return
log.info("Querying existing NFS shares on destination …")
try:
existing = await client.call("sharing.nfs.query") or []
except RuntimeError as exc:
msg = f"Could not query NFS shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_paths = {s.get("path", "").rstrip("/") for s in existing}
log.info(" Destination has %d existing NFS share(s).", len(existing_paths))
for share in shares:
path = share.get("path", "").rstrip("/")
log.info("── NFS export %r", path)
if path in existing_paths:
log.info(" SKIP path already exported on destination.")
summary.nfs_skipped += 1
continue
payload = _nfs_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" [DRY RUN] would create NFS export for %r", path)
summary.nfs_created += 1
continue
try:
r = await client.call("sharing.nfs.create", [payload])
log.info(" CREATED id=%s", r.get("id"))
summary.nfs_created += 1
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
summary.nfs_failed += 1
summary.errors.append(f"NFS share {path!r}: {exc}")
async def migrate_smb_config(
client: TrueNASClient,
config: Optional[dict],
dry_run: bool,
summary: Summary,
) -> None:
if not config:
log.info("No SMB global config found in archive skipping.")
return
payload = _smb_config_payload(config)
log.info("── SMB global config")
log.info(
" netbiosname=%-20s workgroup=%-15s encryption=%s",
repr(payload.get("netbiosname")),
repr(payload.get("workgroup")),
repr(payload.get("encryption")),
)
if dry_run:
log.info(" [DRY RUN] would call smb.update")
summary.cfg_applied = True
return
try:
await client.call("smb.update", [payload])
log.info(" APPLIED")
summary.cfg_applied = True
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
summary.errors.append(f"SMB config: {exc}")
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
async def run(args: argparse.Namespace) -> None:
archive = parse_archive(args.debug_tar)
migrate_set = set(args.migrate)
if args.dry_run:
log.info("=" * 55)
log.info("DRY RUN no changes will be made on the destination")
log.info("=" * 55)
summary = Summary()
async with TrueNASClient(
host=args.dest,
port=args.port,
api_key=args.api_key,
verify_ssl=args.verify_ssl,
) as client:
if "smb" in migrate_set:
await migrate_smb_shares(
client, archive["smb_shares"], args.dry_run, summary)
if "nfs" in migrate_set:
await migrate_nfs_shares(
client, archive["nfs_shares"], args.dry_run, summary)
if "smb-config" in migrate_set:
await migrate_smb_config(
client, archive["smb_config"], args.dry_run, summary)
print(summary.report())
if summary.errors:
sys.exit(2)
def main() -> None:
p = argparse.ArgumentParser(
prog="truenas_migrate.py",
description=(
"Migrate SMB shares, NFS shares, and SMB global config "
"from a TrueNAS debug archive to a live destination system."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
# ── Source ────────────────────────────────────────────────────────────────
p.add_argument(
"--debug-tar", required=True, metavar="FILE",
help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.",
)
p.add_argument(
"--list-archive", action="store_true",
help=(
"List all JSON files found in the archive and exit. "
"Run this first to verify the archive contains share data."
),
)
# ── Destination ───────────────────────────────────────────────────────────
p.add_argument(
"--dest", metavar="HOST",
help="Hostname or IP of the DESTINATION TrueNAS system.",
)
p.add_argument(
"--port", type=int, default=443, metavar="PORT",
help="WebSocket port on the destination (default: 443).",
)
p.add_argument(
"--verify-ssl", action="store_true",
help=(
"Verify the destination TLS certificate. "
"Off by default because most TrueNAS systems use self-signed certs."
),
)
# ── Authentication ────────────────────────────────────────────────────────
p.add_argument(
"--api-key", metavar="KEY",
help=(
"TrueNAS API key. Generate one in TrueNAS UI: "
"top-right account menu → API Keys."
),
)
# ── Scope ─────────────────────────────────────────────────────────────────
p.add_argument(
"--migrate",
nargs="+",
choices=["smb", "nfs", "smb-config"],
default=["smb", "nfs", "smb-config"],
metavar="TYPE",
help=(
"What to migrate. Choices: smb nfs smb-config "
"(default: all three). Example: --migrate smb nfs"
),
)
p.add_argument(
"--dry-run", action="store_true",
help="Parse archive and connect to destination, but make no changes.",
)
p.add_argument(
"--verbose", "-v", action="store_true",
help="Enable DEBUG-level logging.",
)
args = p.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
if not Path(args.debug_tar).is_file():
p.error(f"Archive not found: {args.debug_tar}")
if args.list_archive:
list_archive_and_exit(args.debug_tar) # does not return
if not args.dest:
p.error("--dest is required (or use --list-archive to inspect the archive).")
if not args.api_key:
p.error("--api-key is required.")
asyncio.run(run(args))
if __name__ == "__main__":
main()