781 lines
28 KiB
Python
781 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
truenas_migrate.py – TrueNAS Share Migration Tool
|
||
=====================================================
|
||
Reads SMB shares, NFS shares, and SMB global config from a TrueNAS debug
|
||
archive (.tar / .tgz) produced by the built-in "Save Debug" feature, then
|
||
re-creates them on a destination TrueNAS system via the JSON-RPC 2.0
|
||
WebSocket API (TrueNAS 25.04+).
|
||
|
||
SAFE BY DEFAULT
|
||
• Existing shares are never overwritten or deleted.
|
||
• Always run with --dry-run first to preview what will happen.
|
||
|
||
REQUIREMENTS
|
||
pip install websockets
|
||
Python 3.9+
|
||
|
||
QUICK START
|
||
# 1. Inspect your debug archive to confirm it contains the data you need:
|
||
python truenas_migrate.py --debug-tar debug.tgz --list-archive
|
||
|
||
# 2. Dry-run – connect to destination but make zero changes:
|
||
python truenas_migrate.py \\
|
||
--debug-tar debug.tgz \\
|
||
--dest 192.168.1.50 \\
|
||
--api-key "1-xxxxxxxxxxxx" \\
|
||
--dry-run
|
||
|
||
# 3. Live migration of all three data types:
|
||
python truenas_migrate.py \\
|
||
--debug-tar debug.tgz \\
|
||
--dest 192.168.1.50 \\
|
||
--api-key "1-xxxxxxxxxxxx"
|
||
|
||
# 4. Migrate only SMB shares (skip NFS and global config):
|
||
python truenas_migrate.py \\
|
||
--debug-tar debug.tgz \\
|
||
--dest 192.168.1.50 \\
|
||
--api-key "1-xxxxxxxxxxxx" \\
|
||
--migrate smb
|
||
|
||
CONFLICT POLICY
|
||
Shares that already exist on the destination are silently skipped:
|
||
SMB – matched by share name (case-insensitive)
|
||
NFS – matched by export path (exact match)
|
||
SMB global config is always applied unless --migrate excludes "smb-config".
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import contextlib
|
||
import json
|
||
import logging
|
||
import ssl
|
||
import sys
|
||
import tarfile
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
|
||
# ── Optional dependency check ─────────────────────────────────────────────────
|
||
try:
|
||
import websockets
|
||
from websockets.exceptions import WebSocketException
|
||
except ImportError:
|
||
sys.exit(
|
||
"ERROR: The 'websockets' package is required.\n"
|
||
"Install it with: pip install websockets"
|
||
)
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Logging
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger("truenas_migrate")
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Summary
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class Summary:
|
||
smb_found: int = 0
|
||
smb_created: int = 0
|
||
smb_skipped: int = 0
|
||
smb_failed: int = 0
|
||
|
||
nfs_found: int = 0
|
||
nfs_created: int = 0
|
||
nfs_skipped: int = 0
|
||
nfs_failed: int = 0
|
||
|
||
cfg_applied: bool = False
|
||
errors: list[str] = field(default_factory=list)
|
||
|
||
def report(self) -> str:
|
||
w = 52
|
||
hr = "─" * w
|
||
def row(label: str, val: str) -> str:
|
||
right = w - 2 - len(label) - len(val)
|
||
return f"│ {label}{val}{' ' * right} │"
|
||
|
||
smb_val = (f"found={self.smb_found} created={self.smb_created}"
|
||
f" skipped={self.smb_skipped} failed={self.smb_failed}")
|
||
nfs_val = (f"found={self.nfs_found} created={self.nfs_created}"
|
||
f" skipped={self.nfs_skipped} failed={self.nfs_failed}")
|
||
cfg_val = "applied" if self.cfg_applied else "not applied"
|
||
|
||
lines = [
|
||
"",
|
||
f"┌{hr}┐",
|
||
f"│{'MIGRATION SUMMARY':^{w}}│",
|
||
f"├{hr}┤",
|
||
row(" SMB shares : ", smb_val),
|
||
row(" NFS shares : ", nfs_val),
|
||
row(" SMB config : ", cfg_val),
|
||
f"└{hr}┘",
|
||
]
|
||
if self.errors:
|
||
lines.append(f"\n {len(self.errors)} error(s):")
|
||
for e in self.errors:
|
||
lines.append(f" • {e}")
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Debug archive parser
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
#
|
||
# TrueNAS generates debug archives with its built-in "ixdiagnose" tool (SCALE
|
||
# 24.04+) or the older "freenas-debug" tool (CORE / earlier SCALE).
|
||
# Neither has a fully stable internal layout across versions, so we try a
|
||
# ranked list of known paths and fall back to a keyword heuristic scan.
|
||
#
|
||
# Known ixdiagnose (SCALE) layouts observed in the wild:
|
||
# ixdiagnose/plugins/SMB/sharing.smb.query.json – SMB shares
|
||
# ixdiagnose/plugins/SMB/smb.config.json – SMB global config
|
||
# ixdiagnose/plugins/NFS/sharing.nfs.query.json – NFS shares
|
||
# ixdiagnose/plugins/Sharing/sharing.smb.query.json
|
||
# ixdiagnose/plugins/Sharing/sharing.nfs.query.json
|
||
#
|
||
# Known freenas-debug (CORE) layouts:
|
||
# freenas-debug/sharing/smb.json – SMB shares
|
||
# freenas-debug/sharing/nfs.json – NFS shares
|
||
# (CORE SMB config is plain text; JSON form may not exist)
|
||
|
||
_CANDIDATES: dict[str, list[str]] = {
|
||
"smb_shares": [
|
||
# SCALE 24.04+ – combined plugin file; shares are under "sharing_smb_query"
|
||
"ixdiagnose/plugins/smb/smb_info.json",
|
||
# Older SCALE layouts
|
||
"ixdiagnose/plugins/SMB/sharing.smb.query.json",
|
||
"ixdiagnose/plugins/Sharing/sharing.smb.query.json",
|
||
"ixdiagnose/SMB/sharing.smb.query.json",
|
||
# CORE / freenas-debug
|
||
"freenas-debug/sharing/smb.json",
|
||
"sharing/smb.json",
|
||
"middleware/sharing.smb.query.json",
|
||
],
|
||
"nfs_shares": [
|
||
# SCALE 24.04+ – combined plugin file; shares are under "sharing_nfs_query"
|
||
"ixdiagnose/plugins/nfs/nfs_config.json",
|
||
# Older SCALE layouts
|
||
"ixdiagnose/plugins/NFS/sharing.nfs.query.json",
|
||
"ixdiagnose/plugins/Sharing/sharing.nfs.query.json",
|
||
"ixdiagnose/NFS/sharing.nfs.query.json",
|
||
# CORE / freenas-debug
|
||
"freenas-debug/sharing/nfs.json",
|
||
"sharing/nfs.json",
|
||
"middleware/sharing.nfs.query.json",
|
||
],
|
||
"smb_config": [
|
||
# SCALE 24.04+ – combined plugin file; config is under "smb_config"
|
||
"ixdiagnose/plugins/smb/smb_info.json",
|
||
# Older SCALE layouts
|
||
"ixdiagnose/plugins/SMB/smb.config.json",
|
||
"ixdiagnose/SMB/smb.config.json",
|
||
# CORE / freenas-debug
|
||
"freenas-debug/SMB/smb_config.json",
|
||
"middleware/smb.config.json",
|
||
],
|
||
}
|
||
|
||
# When a candidate file bundles multiple datasets, pull out the right sub-key.
|
||
_KEY_WITHIN_FILE: dict[str, str] = {
|
||
"smb_shares": "sharing_smb_query",
|
||
"nfs_shares": "sharing_nfs_query",
|
||
"smb_config": "smb_config",
|
||
}
|
||
|
||
# Keyword fragments for heuristic fallback scan
|
||
_KEYWORDS: dict[str, list[str]] = {
|
||
"smb_shares": ["sharing.smb", "smb_share", "sharing/smb", "smb_info"],
|
||
"nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs", "nfs_config"],
|
||
"smb_config": ["smb.config", "smb_config", "smb_info"],
|
||
}
|
||
|
||
|
||
def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]:
|
||
"""Return {normalised_path: TarInfo} for every member."""
|
||
return {m.name.lstrip("./"): m for m in tf.getmembers()}
|
||
|
||
|
||
def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]:
|
||
"""Extract and JSON-parse one archive member. Returns None on any error."""
|
||
try:
|
||
fh = tf.extractfile(info)
|
||
if fh is None:
|
||
return None
|
||
raw = fh.read().decode("utf-8", errors="replace").strip()
|
||
return json.loads(raw) if raw else None
|
||
except Exception as exc:
|
||
log.debug("Could not parse %s: %s", info.name, exc)
|
||
return None
|
||
|
||
|
||
def _extract_subkey(raw: Any, data_type: str) -> Optional[Any]:
|
||
"""
|
||
When a JSON file bundles multiple datasets, pull out the sub-key that
|
||
corresponds to data_type (e.g. "sharing_smb_query" from smb_info.json).
|
||
Falls back to the raw value when no sub-key mapping exists.
|
||
"""
|
||
if not isinstance(raw, dict):
|
||
return raw
|
||
key = _KEY_WITHIN_FILE.get(data_type)
|
||
if key and key in raw:
|
||
return raw[key]
|
||
return raw
|
||
|
||
|
||
def _find_data(
|
||
tf: tarfile.TarFile,
|
||
members: dict[str, tarfile.TarInfo],
|
||
data_type: str,
|
||
) -> Optional[Any]:
|
||
"""Try candidate paths, then keyword heuristics. Return parsed JSON or None."""
|
||
|
||
# Pass 1 – exact / suffix match against known candidate paths
|
||
for candidate in _CANDIDATES[data_type]:
|
||
norm = candidate.lstrip("./")
|
||
# Direct hit
|
||
info = members.get(norm)
|
||
if info is None:
|
||
# Archive may have a date-stamped top-level directory
|
||
for path, member in members.items():
|
||
if path == norm or path.endswith("/" + norm):
|
||
info = member
|
||
break
|
||
if info is not None:
|
||
raw = _read_json(tf, info)
|
||
result = _extract_subkey(raw, data_type)
|
||
if result is not None:
|
||
log.info(" %-12s → %s", data_type, info.name)
|
||
return result
|
||
|
||
# Pass 2 – keyword heuristic scan over all .json members
|
||
log.debug(" %s: candidates missed, scanning archive …", data_type)
|
||
keywords = _KEYWORDS[data_type]
|
||
for path in sorted(members):
|
||
if not path.lower().endswith(".json"):
|
||
continue
|
||
if any(kw in path.lower() for kw in keywords):
|
||
raw = _read_json(tf, members[path])
|
||
result = _extract_subkey(raw, data_type)
|
||
if result is not None:
|
||
log.info(" %-12s → %s (heuristic)", data_type, path)
|
||
return result
|
||
|
||
return None
|
||
|
||
|
||
@contextlib.contextmanager
|
||
def _open_source_tar(tar_path: str):
|
||
"""
|
||
Open the archive that actually contains the ixdiagnose data.
|
||
|
||
TrueNAS HA debug bundles (25.04+) wrap each node's ixdiagnose snapshot
|
||
in a separate .txz inside the outer .tgz. We prefer the member whose
|
||
name includes '_active'; if none is labelled that way we fall back to the
|
||
first .txz found. Single-node (non-HA) bundles are used directly.
|
||
"""
|
||
with tarfile.open(tar_path, "r:*") as outer:
|
||
txz_members = [
|
||
m for m in outer.getmembers()
|
||
if m.name.lower().endswith(".txz") and m.isfile()
|
||
]
|
||
if not txz_members:
|
||
yield outer
|
||
return
|
||
|
||
# HA bundle – pick the active node's inner archive
|
||
active = next(
|
||
(m for m in txz_members if "_active" in m.name.lower()),
|
||
txz_members[0],
|
||
)
|
||
log.info(" HA bundle detected; reading inner archive: %s", active.name)
|
||
fh = outer.extractfile(active)
|
||
with tarfile.open(fileobj=fh, mode="r:*") as inner:
|
||
yield inner
|
||
|
||
|
||
def parse_archive(tar_path: str) -> dict[str, Any]:
|
||
"""
|
||
Extract SMB shares, NFS shares, and SMB config from the debug archive.
|
||
Returns: {"smb_shares": list, "nfs_shares": list, "smb_config": dict|None}
|
||
"""
|
||
log.info("Opening archive: %s", tar_path)
|
||
result: dict[str, Any] = {
|
||
"smb_shares": [],
|
||
"nfs_shares": [],
|
||
"smb_config": None,
|
||
}
|
||
|
||
try:
|
||
with _open_source_tar(tar_path) as tf:
|
||
members = _members_map(tf)
|
||
log.info(" Archive contains %d total entries.", len(members))
|
||
|
||
for key in ("smb_shares", "nfs_shares", "smb_config"):
|
||
data = _find_data(tf, members, key)
|
||
if data is None:
|
||
log.warning(" %-12s → NOT FOUND", key)
|
||
continue
|
||
|
||
if key in ("smb_shares", "nfs_shares"):
|
||
if isinstance(data, list):
|
||
result[key] = data
|
||
elif isinstance(data, dict):
|
||
# Some versions wrap the list: {"result": [...]}
|
||
for v in data.values():
|
||
if isinstance(v, list):
|
||
result[key] = v
|
||
break
|
||
else:
|
||
result[key] = data if isinstance(data, dict) else None
|
||
|
||
except (tarfile.TarError, OSError) as exc:
|
||
log.error("Failed to open archive: %s", exc)
|
||
sys.exit(1)
|
||
|
||
log.info(
|
||
"Parsed: %d SMB share(s), %d NFS share(s), SMB config=%s",
|
||
len(result["smb_shares"]),
|
||
len(result["nfs_shares"]),
|
||
"found" if result["smb_config"] else "not found",
|
||
)
|
||
return result
|
||
|
||
|
||
def list_archive_and_exit(tar_path: str) -> None:
|
||
"""
|
||
Print a structured listing of all JSON files in the archive, then exit.
|
||
Helps confirm the archive actually contains the data we need.
|
||
"""
|
||
print(f"\nJSON files in archive: {tar_path}\n")
|
||
try:
|
||
with _open_source_tar(tar_path) as tf:
|
||
json_members = sorted(
|
||
(m for m in tf.getmembers() if m.name.endswith(".json")),
|
||
key=lambda m: m.name,
|
||
)
|
||
if not json_members:
|
||
print(" (no .json files found)")
|
||
else:
|
||
# Group by top-level directory for readability
|
||
current_dir = ""
|
||
for m in json_members:
|
||
parts = m.name.lstrip("./").split("/")
|
||
top = "/".join(parts[:-1]) if len(parts) > 1 else ""
|
||
if top != current_dir:
|
||
print(f"\n {top or '(root)'}/")
|
||
current_dir = top
|
||
print(f" {parts[-1]} ({m.size / 1024:.1f} KB)")
|
||
except (tarfile.TarError, OSError) as exc:
|
||
sys.exit(f"ERROR: {exc}")
|
||
print()
|
||
sys.exit(0)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Payload builders
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
# Read-only / server-generated fields that must NOT be sent on create/update
|
||
_SMB_SHARE_READONLY = frozenset({"id", "locked"})
|
||
_NFS_SHARE_READONLY = frozenset({"id", "locked"})
|
||
_SMB_CONFIG_READONLY = frozenset({"id", "server_sid"})
|
||
|
||
|
||
def _smb_share_payload(share: dict) -> dict:
|
||
return {k: v for k, v in share.items() if k not in _SMB_SHARE_READONLY}
|
||
|
||
|
||
def _nfs_share_payload(share: dict) -> dict:
|
||
return {k: v for k, v in share.items() if k not in _NFS_SHARE_READONLY}
|
||
|
||
|
||
def _smb_config_payload(config: dict) -> dict:
|
||
return {k: v for k, v in config.items() if k not in _SMB_CONFIG_READONLY}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# TrueNAS JSON-RPC 2.0 WebSocket client
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TrueNASClient:
|
||
"""
|
||
Minimal async JSON-RPC 2.0 client for the TrueNAS WebSocket API.
|
||
|
||
TrueNAS 25.04+ endpoint: wss://<host>:<port>/api/current
|
||
Authentication: auth.login_with_api_key
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
host: str,
|
||
api_key: str,
|
||
port: int = 443,
|
||
verify_ssl: bool = False,
|
||
) -> None:
|
||
self._host = host
|
||
self._port = port
|
||
self._api_key = api_key
|
||
self._verify_ssl = verify_ssl
|
||
self._ws = None
|
||
self._call_id = 0
|
||
|
||
@property
|
||
def _url(self) -> str:
|
||
return f"wss://{self._host}:{self._port}/api/current"
|
||
|
||
async def __aenter__(self) -> "TrueNASClient":
|
||
await self._connect()
|
||
return self
|
||
|
||
async def __aexit__(self, *_: Any) -> None:
|
||
if self._ws:
|
||
await self._ws.close()
|
||
self._ws = None
|
||
|
||
async def _connect(self) -> None:
|
||
ctx = ssl.create_default_context()
|
||
if not self._verify_ssl:
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
|
||
log.info("Connecting to %s …", self._url)
|
||
try:
|
||
self._ws = await websockets.connect(
|
||
self._url,
|
||
ssl=ctx,
|
||
ping_interval=20,
|
||
ping_timeout=30,
|
||
open_timeout=20,
|
||
)
|
||
except (WebSocketException, OSError) as exc:
|
||
log.error("Connection failed: %s", exc)
|
||
raise
|
||
|
||
log.info("Authenticating with API key …")
|
||
result = await self.call("auth.login_with_api_key", [self._api_key])
|
||
if result is not True and result != "SUCCESS":
|
||
raise PermissionError(f"Authentication rejected: {result!r}")
|
||
log.info("Connected and authenticated.")
|
||
|
||
async def call(self, method: str, params: Optional[list] = None) -> Any:
|
||
"""
|
||
Send one JSON-RPC request and return its result.
|
||
Raises RuntimeError if the API returns an error.
|
||
"""
|
||
self._call_id += 1
|
||
req_id = self._call_id
|
||
|
||
await self._ws.send(json.dumps({
|
||
"jsonrpc": "2.0",
|
||
"id": req_id,
|
||
"method": method,
|
||
"params": params or [],
|
||
}))
|
||
|
||
# Drain until the matching reply arrives (skip server-push notifications)
|
||
while True:
|
||
raw = await asyncio.wait_for(self._ws.recv(), timeout=60)
|
||
msg = json.loads(raw)
|
||
|
||
if "id" not in msg: # server-initiated notification
|
||
continue
|
||
if msg["id"] != req_id: # response to a different in-flight call
|
||
continue
|
||
|
||
if "error" in msg:
|
||
err = msg["error"]
|
||
reason = (
|
||
err.get("data", {}).get("reason")
|
||
or err.get("message")
|
||
or repr(err)
|
||
)
|
||
raise RuntimeError(f"API error [{method}]: {reason}")
|
||
|
||
return msg.get("result")
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Migration routines
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def migrate_smb_shares(
|
||
client: TrueNASClient,
|
||
shares: list[dict],
|
||
dry_run: bool,
|
||
summary: Summary,
|
||
) -> None:
|
||
summary.smb_found = len(shares)
|
||
if not shares:
|
||
log.info("No SMB shares found in archive.")
|
||
return
|
||
|
||
log.info("Querying existing SMB shares on destination …")
|
||
try:
|
||
existing = await client.call("sharing.smb.query") or []
|
||
except RuntimeError as exc:
|
||
msg = f"Could not query SMB shares: {exc}"
|
||
log.error(msg)
|
||
summary.errors.append(msg)
|
||
return
|
||
|
||
existing_names = {s.get("name", "").lower() for s in existing}
|
||
log.info(" Destination has %d existing SMB share(s).", len(existing_names))
|
||
|
||
for share in shares:
|
||
name = share.get("name", "<unnamed>")
|
||
log.info("── SMB share %r", name)
|
||
|
||
if name.lower() in existing_names:
|
||
log.info(" SKIP – already exists on destination.")
|
||
summary.smb_skipped += 1
|
||
continue
|
||
|
||
payload = _smb_share_payload(share)
|
||
log.debug(" payload: %s", json.dumps(payload))
|
||
|
||
if dry_run:
|
||
log.info(" [DRY RUN] would create SMB share %r → %s",
|
||
name, payload.get("path"))
|
||
summary.smb_created += 1
|
||
continue
|
||
|
||
try:
|
||
r = await client.call("sharing.smb.create", [payload])
|
||
log.info(" CREATED id=%s", r.get("id"))
|
||
summary.smb_created += 1
|
||
except RuntimeError as exc:
|
||
log.error(" FAILED: %s", exc)
|
||
summary.smb_failed += 1
|
||
summary.errors.append(f"SMB share {name!r}: {exc}")
|
||
|
||
|
||
async def migrate_nfs_shares(
|
||
client: TrueNASClient,
|
||
shares: list[dict],
|
||
dry_run: bool,
|
||
summary: Summary,
|
||
) -> None:
|
||
summary.nfs_found = len(shares)
|
||
if not shares:
|
||
log.info("No NFS shares found in archive.")
|
||
return
|
||
|
||
log.info("Querying existing NFS shares on destination …")
|
||
try:
|
||
existing = await client.call("sharing.nfs.query") or []
|
||
except RuntimeError as exc:
|
||
msg = f"Could not query NFS shares: {exc}"
|
||
log.error(msg)
|
||
summary.errors.append(msg)
|
||
return
|
||
|
||
existing_paths = {s.get("path", "").rstrip("/") for s in existing}
|
||
log.info(" Destination has %d existing NFS share(s).", len(existing_paths))
|
||
|
||
for share in shares:
|
||
path = share.get("path", "").rstrip("/")
|
||
log.info("── NFS export %r", path)
|
||
|
||
if path in existing_paths:
|
||
log.info(" SKIP – path already exported on destination.")
|
||
summary.nfs_skipped += 1
|
||
continue
|
||
|
||
payload = _nfs_share_payload(share)
|
||
log.debug(" payload: %s", json.dumps(payload))
|
||
|
||
if dry_run:
|
||
log.info(" [DRY RUN] would create NFS export for %r", path)
|
||
summary.nfs_created += 1
|
||
continue
|
||
|
||
try:
|
||
r = await client.call("sharing.nfs.create", [payload])
|
||
log.info(" CREATED id=%s", r.get("id"))
|
||
summary.nfs_created += 1
|
||
except RuntimeError as exc:
|
||
log.error(" FAILED: %s", exc)
|
||
summary.nfs_failed += 1
|
||
summary.errors.append(f"NFS share {path!r}: {exc}")
|
||
|
||
|
||
async def migrate_smb_config(
|
||
client: TrueNASClient,
|
||
config: Optional[dict],
|
||
dry_run: bool,
|
||
summary: Summary,
|
||
) -> None:
|
||
if not config:
|
||
log.info("No SMB global config found in archive – skipping.")
|
||
return
|
||
|
||
payload = _smb_config_payload(config)
|
||
log.info("── SMB global config")
|
||
log.info(
|
||
" netbiosname=%-20s workgroup=%-15s encryption=%s",
|
||
repr(payload.get("netbiosname")),
|
||
repr(payload.get("workgroup")),
|
||
repr(payload.get("encryption")),
|
||
)
|
||
|
||
if dry_run:
|
||
log.info(" [DRY RUN] would call smb.update")
|
||
summary.cfg_applied = True
|
||
return
|
||
|
||
try:
|
||
await client.call("smb.update", [payload])
|
||
log.info(" APPLIED")
|
||
summary.cfg_applied = True
|
||
except RuntimeError as exc:
|
||
log.error(" FAILED: %s", exc)
|
||
summary.errors.append(f"SMB config: {exc}")
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def run(args: argparse.Namespace) -> None:
|
||
archive = parse_archive(args.debug_tar)
|
||
migrate_set = set(args.migrate)
|
||
|
||
if args.dry_run:
|
||
log.info("=" * 55)
|
||
log.info("DRY RUN – no changes will be made on the destination")
|
||
log.info("=" * 55)
|
||
|
||
summary = Summary()
|
||
|
||
async with TrueNASClient(
|
||
host=args.dest,
|
||
port=args.port,
|
||
api_key=args.api_key,
|
||
verify_ssl=args.verify_ssl,
|
||
) as client:
|
||
|
||
if "smb" in migrate_set:
|
||
await migrate_smb_shares(
|
||
client, archive["smb_shares"], args.dry_run, summary)
|
||
|
||
if "nfs" in migrate_set:
|
||
await migrate_nfs_shares(
|
||
client, archive["nfs_shares"], args.dry_run, summary)
|
||
|
||
if "smb-config" in migrate_set:
|
||
await migrate_smb_config(
|
||
client, archive["smb_config"], args.dry_run, summary)
|
||
|
||
print(summary.report())
|
||
if summary.errors:
|
||
sys.exit(2)
|
||
|
||
|
||
def main() -> None:
|
||
p = argparse.ArgumentParser(
|
||
prog="truenas_migrate.py",
|
||
description=(
|
||
"Migrate SMB shares, NFS shares, and SMB global config "
|
||
"from a TrueNAS debug archive to a live destination system."
|
||
),
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
|
||
# ── Source ────────────────────────────────────────────────────────────────
|
||
p.add_argument(
|
||
"--debug-tar", required=True, metavar="FILE",
|
||
help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.",
|
||
)
|
||
p.add_argument(
|
||
"--list-archive", action="store_true",
|
||
help=(
|
||
"List all JSON files found in the archive and exit. "
|
||
"Run this first to verify the archive contains share data."
|
||
),
|
||
)
|
||
|
||
# ── Destination ───────────────────────────────────────────────────────────
|
||
p.add_argument(
|
||
"--dest", metavar="HOST",
|
||
help="Hostname or IP of the DESTINATION TrueNAS system.",
|
||
)
|
||
p.add_argument(
|
||
"--port", type=int, default=443, metavar="PORT",
|
||
help="WebSocket port on the destination (default: 443).",
|
||
)
|
||
p.add_argument(
|
||
"--verify-ssl", action="store_true",
|
||
help=(
|
||
"Verify the destination TLS certificate. "
|
||
"Off by default because most TrueNAS systems use self-signed certs."
|
||
),
|
||
)
|
||
|
||
# ── Authentication ────────────────────────────────────────────────────────
|
||
p.add_argument(
|
||
"--api-key", metavar="KEY",
|
||
help=(
|
||
"TrueNAS API key. Generate one in TrueNAS UI: "
|
||
"top-right account menu → API Keys."
|
||
),
|
||
)
|
||
|
||
# ── Scope ─────────────────────────────────────────────────────────────────
|
||
p.add_argument(
|
||
"--migrate",
|
||
nargs="+",
|
||
choices=["smb", "nfs", "smb-config"],
|
||
default=["smb", "nfs", "smb-config"],
|
||
metavar="TYPE",
|
||
help=(
|
||
"What to migrate. Choices: smb nfs smb-config "
|
||
"(default: all three). Example: --migrate smb nfs"
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--dry-run", action="store_true",
|
||
help="Parse archive and connect to destination, but make no changes.",
|
||
)
|
||
p.add_argument(
|
||
"--verbose", "-v", action="store_true",
|
||
help="Enable DEBUG-level logging.",
|
||
)
|
||
|
||
args = p.parse_args()
|
||
|
||
if args.verbose:
|
||
log.setLevel(logging.DEBUG)
|
||
|
||
if not Path(args.debug_tar).is_file():
|
||
p.error(f"Archive not found: {args.debug_tar}")
|
||
|
||
if args.list_archive:
|
||
list_archive_and_exit(args.debug_tar) # does not return
|
||
|
||
if not args.dest:
|
||
p.error("--dest is required (or use --list-archive to inspect the archive).")
|
||
if not args.api_key:
|
||
p.error("--api-key is required.")
|
||
|
||
asyncio.run(run(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|