Merge devel: restructure into package, remove SMB config migration

- Split single-file script into truenas_migrate/ package
- Removed SMB global config migration (not needed for deployment use)
- Added compatibility shim so both invocation styles still work
- Added __pycache__ to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 22:05:27 -05:00
10 changed files with 1395 additions and 1392 deletions

2
.gitignore vendored
View File

@@ -1 +1,3 @@
CLAUDE.md
__pycache__/
*.pyc

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
# truenas_migrate package

View File

@@ -0,0 +1,3 @@
from .cli import main
main()

352
truenas_migrate/archive.py Normal file
View File

@@ -0,0 +1,352 @@
"""TrueNAS debug archive parser (SCALE ixdiagnose and CORE fndebug layouts)."""
from __future__ import annotations
import contextlib
import json
import sys
import tarfile
from typing import Any, Optional
from .colors import log
# ─────────────────────────────────────────────────────────────────────────────
# Archive layout constants
# ─────────────────────────────────────────────────────────────────────────────
#
# TrueNAS SCALE generates debug archives with the "ixdiagnose" tool.
# The internal layout has changed across versions:
#
# SCALE 24.04+ (plugins layout, lowercase dirs, combined JSON files)
# ixdiagnose/plugins/smb/smb_info.json SMB shares + config combined
# ixdiagnose/plugins/nfs/nfs_config.json NFS shares + config combined
#
# Older SCALE (plugins layout, uppercase dirs, per-query JSON files)
# ixdiagnose/plugins/SMB/sharing.smb.query.json
# ixdiagnose/plugins/NFS/sharing.nfs.query.json
# ixdiagnose/plugins/Sharing/sharing.smb.query.json
# ixdiagnose/plugins/Sharing/sharing.nfs.query.json
#
# TrueNAS CORE uses the "freenas-debug" tool (stored as "fndebug" inside the
# archive). It produces plain-text dump files with embedded JSON blocks.
_CANDIDATES: dict[str, list[str]] = {
"smb_shares": [
"ixdiagnose/plugins/smb/smb_info.json",
"ixdiagnose/plugins/SMB/sharing.smb.query.json",
"ixdiagnose/plugins/Sharing/sharing.smb.query.json",
"ixdiagnose/SMB/sharing.smb.query.json",
],
"nfs_shares": [
"ixdiagnose/plugins/nfs/nfs_config.json",
"ixdiagnose/plugins/NFS/sharing.nfs.query.json",
"ixdiagnose/plugins/Sharing/sharing.nfs.query.json",
"ixdiagnose/NFS/sharing.nfs.query.json",
],
}
# When a candidate file bundles multiple datasets, pull out the right sub-key.
_KEY_WITHIN_FILE: dict[str, str] = {
"smb_shares": "sharing_smb_query",
"nfs_shares": "sharing_nfs_query",
}
# Keyword fragments for heuristic fallback scan (SCALE archives only)
_KEYWORDS: dict[str, list[str]] = {
"smb_shares": ["sharing.smb", "smb_share", "sharing/smb", "smb_info"],
"nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs", "nfs_config"],
}
# Presence of this path prefix identifies a TrueNAS CORE archive.
_CORE_MARKER = "ixdiagnose/fndebug"
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]:
"""Return {normalised_path: TarInfo} for every member."""
return {m.name.lstrip("./"): m for m in tf.getmembers()}
def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]:
"""Extract and JSON-parse one archive member. Returns None on any error."""
try:
fh = tf.extractfile(info)
if fh is None:
return None
raw = fh.read().decode("utf-8", errors="replace").strip()
return json.loads(raw) if raw else None
except Exception as exc:
log.debug("Could not parse %s: %s", info.name, exc)
return None
def _extract_subkey(raw: Any, data_type: str) -> Optional[Any]:
"""Pull out the relevant sub-key when a JSON file bundles multiple datasets."""
if not isinstance(raw, dict):
return raw
key = _KEY_WITHIN_FILE.get(data_type)
if key and key in raw:
return raw[key]
return raw
def _find_data(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
data_type: str,
) -> Optional[Any]:
"""Try candidate paths, then keyword heuristics. Return parsed JSON or None."""
# Pass 1 exact / suffix match against known candidate paths
for candidate in _CANDIDATES[data_type]:
norm = candidate.lstrip("./")
info = members.get(norm)
if info is None:
# Archive may have a date-stamped top-level directory
for path, member in members.items():
if path == norm or path.endswith("/" + norm):
info = member
break
if info is not None:
raw = _read_json(tf, info)
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s", data_type, info.name)
return result
# Pass 2 keyword heuristic scan over all .json members
log.debug(" %s: candidates missed, scanning archive …", data_type)
keywords = _KEYWORDS[data_type]
for path in sorted(members):
if not path.lower().endswith(".json"):
continue
if any(kw in path.lower() for kw in keywords):
raw = _read_json(tf, members[path])
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s (heuristic)", data_type, path)
return result
return None
def _extract_core_dump_json(dump_text: str, title_fragment: str) -> list[Any]:
"""
Extract all top-level JSON values from a named section of a CORE dump.txt.
CORE dump sections look like:
+--------...--------+
+ SECTION TITLE +
+--------...--------+
<content>
debug finished in N seconds for SECTION TITLE
Returns a list of parsed JSON values found in the content block, in order.
"""
import re as _re
parts = _re.split(r'\+[-]{20,}\+', dump_text)
for i, part in enumerate(parts):
if title_fragment.lower() in part.lower() and i + 1 < len(parts):
content = parts[i + 1]
content = _re.sub(
r'debug finished.*', '', content,
flags=_re.IGNORECASE | _re.DOTALL,
).strip()
results: list[Any] = []
decoder = json.JSONDecoder()
pos = 0
while pos < len(content):
remaining = content[pos:].lstrip()
if not remaining or remaining[0] not in "{[":
break
pos += len(content[pos:]) - len(remaining)
try:
val, end = decoder.raw_decode(remaining)
results.append(val)
pos += end
except json.JSONDecodeError:
break
return results
return []
def _parse_core_into(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
result: dict[str, Any],
) -> None:
"""Populate *result* from TrueNAS CORE fndebug dump files."""
log.info("TrueNAS CORE archive detected; parsing fndebug dump files.")
smb_key = "ixdiagnose/fndebug/SMB/dump.txt"
if smb_key in members:
fh = tf.extractfile(members[smb_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Database Dump")
if len(vals) >= 2 and isinstance(vals[1], list):
result["smb_shares"] = vals[1]
log.info(" smb_shares → %s (CORE, %d share(s))", smb_key, len(vals[1]))
elif vals:
log.warning(" smb_shares → NOT FOUND in Database Dump")
else:
log.warning(" SMB dump not found: %s", smb_key)
nfs_key = "ixdiagnose/fndebug/NFS/dump.txt"
if nfs_key in members:
fh = tf.extractfile(members[nfs_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Configuration")
if len(vals) >= 2 and isinstance(vals[1], list):
result["nfs_shares"] = vals[1]
log.info(" nfs_shares → %s (CORE, %d share(s))", nfs_key, len(vals[1]))
else:
log.warning(" nfs_shares → NOT FOUND in Configuration")
else:
log.warning(" NFS dump not found: %s", nfs_key)
if not result["smb_shares"] and not result["nfs_shares"]:
log.warning(
"No share data found in CORE archive. "
"This is expected when SMB/NFS services were disabled on the source system."
)
@contextlib.contextmanager
def _open_source_tar(tar_path: str):
"""
Open the archive that actually contains the ixdiagnose data.
TrueNAS HA debug bundles (25.04+) wrap each node's ixdiagnose snapshot
in a separate .txz inside the outer .tgz. We prefer the member whose
name includes '_active'; if none is labelled that way we fall back to the
first .txz found. Single-node (non-HA) bundles are used directly.
"""
with tarfile.open(tar_path, "r:*") as outer:
txz_members = [
m for m in outer.getmembers()
if m.name.lower().endswith(".txz") and m.isfile()
]
if not txz_members:
yield outer
return
active = next(
(m for m in txz_members if "_active" in m.name.lower()),
txz_members[0],
)
log.info(" HA bundle detected; reading inner archive: %s", active.name)
fh = outer.extractfile(active)
with tarfile.open(fileobj=fh, mode="r:*") as inner:
yield inner
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def parse_archive(tar_path: str) -> dict[str, Any]:
"""
Extract SMB shares and NFS shares from the debug archive.
Returns: {"smb_shares": list, "nfs_shares": list}
"""
log.info("Opening archive: %s", tar_path)
result: dict[str, Any] = {
"smb_shares": [],
"nfs_shares": [],
}
try:
with _open_source_tar(tar_path) as tf:
members = _members_map(tf)
log.info(" Archive contains %d total entries.", len(members))
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members
)
if is_core:
_parse_core_into(tf, members, result)
else:
for key in ("smb_shares", "nfs_shares"):
data = _find_data(tf, members, key)
if data is None:
log.warning(" %-12s → NOT FOUND", key)
continue
if isinstance(data, list):
result[key] = data
elif isinstance(data, dict):
# Some versions wrap the list: {"result": [...]}
for v in data.values():
if isinstance(v, list):
result[key] = v
break
except (tarfile.TarError, OSError) as exc:
log.error("Failed to open archive: %s", exc)
sys.exit(1)
log.info(
"Parsed: %d SMB share(s), %d NFS share(s)",
len(result["smb_shares"]),
len(result["nfs_shares"]),
)
return result
def list_archive_and_exit(tar_path: str) -> None:
"""
Print a structured listing of the archive contents, then exit.
For SCALE archives: lists all .json plugin files.
For CORE archives: lists the fndebug dump files and the JSON sections
that contain share data.
"""
try:
with _open_source_tar(tar_path) as tf:
members_map = _members_map(tf)
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members_map
)
if is_core:
print(f"\nTrueNAS CORE archive: {tar_path}\n")
print(" fndebug plain-text dump files (JSON is embedded inside):\n")
dump_files = sorted(
p for p in members_map
if p.startswith(_CORE_MARKER + "/") and p.endswith(".txt")
)
for p in dump_files:
size = members_map[p].size / 1024
print(f" {p} ({size:.1f} KB)")
print()
print(" Data this tool will extract:")
print(" SMB shares → fndebug/SMB/dump.txt (\"Database Dump\" section)")
print(" NFS shares → fndebug/NFS/dump.txt (\"Configuration\" section)")
else:
print(f"\nJSON plugin files in archive: {tar_path}\n")
json_members = sorted(
(m for m in tf.getmembers() if m.name.endswith(".json")),
key=lambda m: m.name,
)
if not json_members:
print(" (no .json files found)")
else:
current_dir = ""
for m in json_members:
parts = m.name.lstrip("./").split("/")
top = "/".join(parts[:-1]) if len(parts) > 1 else ""
if top != current_dir:
print(f"\n {top or '(root)'}/")
current_dir = top
print(f" {parts[-1]} ({m.size / 1024:.1f} KB)")
except (tarfile.TarError, OSError) as exc:
sys.exit(f"ERROR: {exc}")
print()
sys.exit(0)

425
truenas_migrate/cli.py Normal file
View File

@@ -0,0 +1,425 @@
"""
truenas_migrate TrueNAS Share Migration Tool
=================================================
Reads SMB shares and NFS shares from a TrueNAS debug archive (.tar / .tgz)
produced by the built-in "Save Debug" feature, then re-creates them on a
destination TrueNAS system via the JSON-RPC 2.0 WebSocket API (TrueNAS 25.04+).
SAFE BY DEFAULT
• Existing shares are never overwritten or deleted.
• Always run with --dry-run first to preview what will happen.
REQUIREMENTS
Python 3.9+ (stdlib only no external packages needed)
QUICK START
# 1. Inspect your debug archive to confirm it contains the data you need:
python -m truenas_migrate --debug-tar debug.tgz --list-archive
# 2. Dry-run connect to destination but make zero changes:
python -m truenas_migrate \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--dry-run
# 3. Live migration:
python -m truenas_migrate \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx"
# 4. Migrate only SMB shares (skip NFS):
python -m truenas_migrate \\
--debug-tar debug.tgz \\
--dest 192.168.1.50 \\
--api-key "1-xxxxxxxxxxxx" \\
--migrate smb
CONFLICT POLICY
Shares that already exist on the destination are silently skipped:
SMB matched by share name (case-insensitive)
NFS matched by export path (exact match)
"""
from __future__ import annotations
import argparse
import asyncio
import getpass
import logging
import sys
from pathlib import Path
from typing import Optional
from .archive import parse_archive, list_archive_and_exit
from .client import TrueNASClient, check_dataset_paths, create_missing_datasets
from .colors import log, _bold, _bold_cyan, _bold_yellow, _cyan, _dim, _green, _yellow
from .migrate import migrate_smb_shares, migrate_nfs_shares
from .summary import Summary
# ─────────────────────────────────────────────────────────────────────────────
# CLI orchestration
# ─────────────────────────────────────────────────────────────────────────────
async def run(
args: argparse.Namespace,
archive: Optional[dict] = None,
) -> Summary:
if archive is None:
archive = parse_archive(args.debug_tar)
migrate_set = set(args.migrate)
if args.dry_run:
msg = " DRY RUN no changes will be made on the destination "
bar = _bold_yellow("" * len(msg))
print(f"\n{_bold_yellow('')}{bar}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{_bold_yellow(msg)}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{bar}{_bold_yellow('')}\n", file=sys.stderr)
summary = Summary()
async with TrueNASClient(
host=args.dest,
port=args.port,
api_key=args.api_key,
verify_ssl=args.verify_ssl,
) as client:
if "smb" in migrate_set:
await migrate_smb_shares(
client, archive["smb_shares"], args.dry_run, summary)
if "nfs" in migrate_set:
await migrate_nfs_shares(
client, archive["nfs_shares"], args.dry_run, summary)
if args.dry_run and summary.paths_to_create:
summary.missing_datasets = await check_dataset_paths(
client, summary.paths_to_create,
)
return summary
# ─────────────────────────────────────────────────────────────────────────────
# Interactive wizard helpers
# ─────────────────────────────────────────────────────────────────────────────
def _find_debug_archives(directory: str = ".") -> list[Path]:
"""Return sorted list of TrueNAS debug archives found in *directory*."""
patterns = ("*.tgz", "*.tar.gz", "*.tar", "*.txz", "*.tar.xz")
found: set[Path] = set()
for pat in patterns:
found.update(Path(directory).glob(pat))
return sorted(found)
def _prompt(label: str, default: str = "") -> str:
suffix = f" [{default}]" if default else ""
try:
val = input(f"{label}{suffix}: ").strip()
return val if val else default
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
def _confirm(label: str) -> bool:
try:
return input(f"{label} [y/N]: ").strip().lower() in ("y", "yes")
except (EOFError, KeyboardInterrupt):
print()
return False
def _select_shares(shares: list[dict], share_type: str) -> list[dict]:
"""
Display a numbered list of *shares* and return only those the user selects.
Enter (or 'all') returns all shares unchanged. 'n' / 'none' returns [].
"""
if not shares:
return shares
print(f"\n {_bold(f'{share_type} shares in archive ({len(shares)}):')} \n")
for i, share in enumerate(shares, 1):
if share_type == "SMB":
name = share.get("name", "<unnamed>")
path = share.get("path", "")
print(f" {_cyan(str(i) + '.')} {name:<22} {_dim(path)}")
else: # NFS
pl = share.get("paths") or []
path = share.get("path") or (pl[0] if pl else "")
extra = f" {_dim('+ ' + str(len(pl) - 1) + ' more')}" if len(pl) > 1 else ""
print(f" {_cyan(str(i) + '.')} {path}{extra}")
print()
raw = _prompt(
f" Select {share_type} shares to migrate "
"(e.g. '1 3', Enter = all, 'n' = none)",
default="all",
)
low = raw.strip().lower()
if low in ("", "all"):
print(f" {_green('')} All {len(shares)} {share_type} share(s) selected.")
return shares
if low in ("n", "none", "0"):
print(f" {_yellow('')} No {share_type} shares selected.")
return []
seen: set[int] = set()
selected: list[dict] = []
for tok in raw.split():
if tok.isdigit():
idx = int(tok) - 1
if 0 <= idx < len(shares) and idx not in seen:
seen.add(idx)
selected.append(shares[idx])
if selected:
print(f" {_green('')} {len(selected)} of {len(shares)} {share_type} share(s) selected.")
else:
print(f" {_yellow('')} No valid selections; skipping {share_type} shares.")
return selected
# ─────────────────────────────────────────────────────────────────────────────
# Interactive wizard
# ─────────────────────────────────────────────────────────────────────────────
def interactive_mode() -> None:
"""Interactive wizard: pick archive → configure → dry run → confirm → apply."""
print(
f"\n{_bold_cyan(' TrueNAS Share Migration Tool')}\n"
f" {_dim('Migrate SMB/NFS shares from a debug archive to a live system.')}\n"
)
# 1 ── Locate debug archive ────────────────────────────────────────────────
archives = _find_debug_archives()
if not archives:
sys.exit(
"No debug archives (.tgz / .tar.gz / .tar / .txz) found in the "
"current directory.\n"
"Copy your TrueNAS debug file here, or use --debug-tar to specify a path."
)
if len(archives) == 1:
chosen = archives[0]
print(f" {_dim('Archive:')} {_bold(chosen.name)} "
f"{_dim('(' + f'{chosen.stat().st_size / 1_048_576:.1f} MB' + ')')}\n")
else:
print(f" {_bold('Debug archives found:')}\n")
for i, p in enumerate(archives, 1):
print(f" {_cyan(str(i) + '.')} {p.name} "
f"{_dim('(' + f'{p.stat().st_size / 1_048_576:.1f} MB' + ')')}")
print()
while True:
raw = _prompt(f"Select archive [1-{len(archives)}]")
if raw.isdigit() and 1 <= int(raw) <= len(archives):
chosen = archives[int(raw) - 1]
break
print(f" Enter a number from 1 to {len(archives)}.")
# 2 ── Destination ─────────────────────────────────────────────────────────
print()
host = ""
while not host:
host = _prompt("Destination TrueNAS host or IP")
if not host:
print(" Host is required.")
port_raw = _prompt("WebSocket port", default="443")
port = int(port_raw) if port_raw.isdigit() else 443
# 3 ── API key ─────────────────────────────────────────────────────────────
api_key = ""
while not api_key:
try:
api_key = getpass.getpass("API key (input hidden): ").strip()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
if not api_key:
print(" API key is required.")
# 4 ── Migration scope ─────────────────────────────────────────────────────
print(f"\n {_bold('What to migrate?')}")
print(f" {_cyan('1.')} SMB shares")
print(f" {_cyan('2.')} NFS shares")
sel_raw = _prompt(
"Selection (space-separated numbers, Enter for all)", default="1 2"
)
_sel_map = {"1": "smb", "2": "nfs"}
migrate: list[str] = []
for tok in sel_raw.split():
if tok in _sel_map and _sel_map[tok] not in migrate:
migrate.append(_sel_map[tok])
if not migrate:
migrate = ["smb", "nfs"]
# 5 ── Parse archive once (reused for dry + live runs) ────────────────────
print()
archive_data = parse_archive(str(chosen))
# 5b ── Select individual shares ───────────────────────────────────────────
if "smb" in migrate and archive_data["smb_shares"]:
archive_data["smb_shares"] = _select_shares(archive_data["smb_shares"], "SMB")
if "nfs" in migrate and archive_data["nfs_shares"]:
archive_data["nfs_shares"] = _select_shares(archive_data["nfs_shares"], "NFS")
print()
base_ns = dict(
debug_tar=str(chosen),
dest=host,
port=port,
api_key=api_key,
verify_ssl=False,
migrate=migrate,
)
# 6 ── Dry run ─────────────────────────────────────────────────────────────
dry_summary = asyncio.run(
run(argparse.Namespace(**base_ns, dry_run=True), archive_data)
)
print(dry_summary.report())
# Offer to create missing datasets before the live run
if dry_summary.missing_datasets:
non_mnt = [p for p in dry_summary.missing_datasets if not p.startswith("/mnt/")]
creatable = [p for p in dry_summary.missing_datasets if p.startswith("/mnt/")]
if non_mnt:
print(f" NOTE: {len(non_mnt)} path(s) cannot be auto-created "
"(not under /mnt/):")
for p in non_mnt:
print(f"{p}")
print()
if creatable:
print(f" {len(creatable)} dataset(s) can be created automatically:")
for p in creatable:
print(f"{p}")
print()
if _confirm(f"Create these {len(creatable)} dataset(s) on {host} now?"):
asyncio.run(create_missing_datasets(
host=host,
port=port,
api_key=api_key,
paths=creatable,
))
print()
if not _confirm(f"Apply these changes to {host}?"):
print("Aborted no changes made.")
sys.exit(0)
# 7 ── Live run ────────────────────────────────────────────────────────────
print()
live_summary = asyncio.run(
run(argparse.Namespace(**base_ns, dry_run=False), archive_data)
)
print(live_summary.report())
if live_summary.errors:
sys.exit(2)
# ─────────────────────────────────────────────────────────────────────────────
# Argument parser + entry point
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
if len(sys.argv) == 1:
interactive_mode()
return
p = argparse.ArgumentParser(
prog="truenas_migrate",
description=(
"Migrate SMB and NFS shares from a TrueNAS debug archive "
"to a live destination system."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
# ── Source ────────────────────────────────────────────────────────────────
p.add_argument(
"--debug-tar", required=True, metavar="FILE",
help="Path to the TrueNAS debug .tar / .tgz from the SOURCE system.",
)
p.add_argument(
"--list-archive", action="store_true",
help=(
"List all JSON files found in the archive and exit. "
"Run this first to verify the archive contains share data."
),
)
# ── Destination ───────────────────────────────────────────────────────────
p.add_argument(
"--dest", metavar="HOST",
help="Hostname or IP of the DESTINATION TrueNAS system.",
)
p.add_argument(
"--port", type=int, default=443, metavar="PORT",
help="WebSocket port on the destination (default: 443).",
)
p.add_argument(
"--verify-ssl", action="store_true",
help=(
"Verify the destination TLS certificate. "
"Off by default because most TrueNAS systems use self-signed certs."
),
)
# ── Authentication ────────────────────────────────────────────────────────
p.add_argument(
"--api-key", metavar="KEY",
help=(
"TrueNAS API key. Generate one in TrueNAS UI: "
"top-right account menu → API Keys."
),
)
# ── Scope ─────────────────────────────────────────────────────────────────
p.add_argument(
"--migrate",
nargs="+",
choices=["smb", "nfs"],
default=["smb", "nfs"],
metavar="TYPE",
help=(
"What to migrate. Choices: smb nfs "
"(default: both). Example: --migrate smb"
),
)
p.add_argument(
"--dry-run", action="store_true",
help="Parse archive and connect to destination, but make no changes.",
)
p.add_argument(
"--verbose", "-v", action="store_true",
help="Enable DEBUG-level logging.",
)
args = p.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
if not Path(args.debug_tar).is_file():
p.error(f"Archive not found: {args.debug_tar}")
if args.list_archive:
list_archive_and_exit(args.debug_tar) # does not return
if not args.dest:
p.error("--dest is required (or use --list-archive to inspect the archive).")
if not args.api_key:
p.error("--api-key is required.")
summary = asyncio.run(run(args))
print(summary.report())
if summary.errors:
sys.exit(2)

308
truenas_migrate/client.py Normal file
View File

@@ -0,0 +1,308 @@
"""TrueNAS WebSocket client and dataset utilities."""
from __future__ import annotations
import asyncio
import base64
import contextlib
import hashlib
import json
import os
import ssl
import struct
from typing import Any, Optional
from .colors import log
# ─────────────────────────────────────────────────────────────────────────────
# Raw WebSocket implementation (stdlib only, RFC 6455)
# ─────────────────────────────────────────────────────────────────────────────
def _ws_mask(data: bytes, mask: bytes) -> bytes:
"""XOR *data* with a 4-byte repeating mask key."""
out = bytearray(data)
for i in range(len(out)):
out[i] ^= mask[i & 3]
return bytes(out)
def _ws_encode_frame(payload: bytes, opcode: int = 0x1) -> bytes:
"""Encode a masked client→server WebSocket frame."""
mask = os.urandom(4)
length = len(payload)
header = bytearray([0x80 | opcode])
if length < 126:
header.append(0x80 | length)
elif length < 65536:
header.append(0x80 | 126)
header += struct.pack("!H", length)
else:
header.append(0x80 | 127)
header += struct.pack("!Q", length)
return bytes(header) + mask + _ws_mask(payload, mask)
async def _ws_recv_message(reader: asyncio.StreamReader) -> str:
"""
Read one complete WebSocket message, reassembling continuation frames.
Skips ping/pong control frames. Raises OSError on close frame.
"""
fragments: list[bytes] = []
while True:
hdr = await reader.readexactly(2)
fin = bool(hdr[0] & 0x80)
opcode = hdr[0] & 0x0F
masked = bool(hdr[1] & 0x80)
length = hdr[1] & 0x7F
if length == 126:
length = struct.unpack("!H", await reader.readexactly(2))[0]
elif length == 127:
length = struct.unpack("!Q", await reader.readexactly(8))[0]
mask_key = await reader.readexactly(4) if masked else None
payload = await reader.readexactly(length) if length else b""
if mask_key:
payload = _ws_mask(payload, mask_key)
if opcode == 0x8:
raise OSError("WebSocket: server sent close frame")
if opcode in (0x9, 0xA):
continue
fragments.append(payload)
if fin:
return b"".join(fragments).decode("utf-8")
class _WebSocket:
"""asyncio StreamReader/Writer wrapped to a simple send/recv/close API."""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
self._reader = reader
self._writer = writer
async def send(self, data: str) -> None:
self._writer.write(_ws_encode_frame(data.encode("utf-8"), opcode=0x1))
await self._writer.drain()
async def recv(self) -> str:
return await _ws_recv_message(self._reader)
async def close(self) -> None:
with contextlib.suppress(Exception):
self._writer.write(_ws_encode_frame(b"", opcode=0x8))
await self._writer.drain()
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
async def _ws_connect(host: str, port: int, path: str, ssl_ctx: ssl.SSLContext) -> _WebSocket:
"""Open a TLS connection, perform the HTTP→WebSocket upgrade, return a _WebSocket."""
reader, writer = await asyncio.open_connection(host, port, ssl=ssl_ctx)
key = base64.b64encode(os.urandom(16)).decode()
writer.write((
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"\r\n"
).encode())
await writer.drain()
response_lines: list[bytes] = []
while True:
line = await asyncio.wait_for(reader.readline(), timeout=20)
if not line:
raise OSError("Connection closed during WebSocket handshake")
response_lines.append(line)
if line in (b"\r\n", b"\n"):
break
status = response_lines[0].decode("latin-1").strip()
if " 101 " not in status:
raise OSError(f"WebSocket upgrade failed: {status}")
expected = base64.b64encode(
hashlib.sha1(
(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode()
).digest()
).decode().lower()
headers_text = b"".join(response_lines).decode("latin-1").lower()
if expected not in headers_text:
raise OSError("WebSocket upgrade: Sec-WebSocket-Accept mismatch")
return _WebSocket(reader, writer)
# ─────────────────────────────────────────────────────────────────────────────
# TrueNAS JSON-RPC 2.0 client
# ─────────────────────────────────────────────────────────────────────────────
class TrueNASClient:
"""
Minimal async JSON-RPC 2.0 client for the TrueNAS WebSocket API.
TrueNAS 25.04+ endpoint: wss://<host>:<port>/api/current
Authentication: auth.login_with_api_key
"""
def __init__(
self,
host: str,
api_key: str,
port: int = 443,
verify_ssl: bool = False,
) -> None:
self._host = host
self._port = port
self._api_key = api_key
self._verify_ssl = verify_ssl
self._ws = None
self._call_id = 0
@property
def _url(self) -> str:
return f"wss://{self._host}:{self._port}/api/current"
async def __aenter__(self) -> "TrueNASClient":
await self._connect()
return self
async def __aexit__(self, *_: Any) -> None:
if self._ws:
await self._ws.close()
self._ws = None
async def _connect(self) -> None:
ctx = ssl.create_default_context()
if not self._verify_ssl:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
log.info("Connecting to %s", self._url)
try:
self._ws = await _ws_connect(
host=self._host,
port=self._port,
path="/api/current",
ssl_ctx=ctx,
)
except (OSError, asyncio.TimeoutError) as exc:
log.error("Connection failed: %s", exc)
raise
log.info("Authenticating with API key …")
result = await self.call("auth.login_with_api_key", [self._api_key])
if result is not True and result != "SUCCESS":
raise PermissionError(f"Authentication rejected: {result!r}")
log.info("Connected and authenticated.")
async def call(self, method: str, params: Optional[list] = None) -> Any:
"""Send one JSON-RPC request and return its result.
Raises RuntimeError if the API returns an error.
"""
self._call_id += 1
req_id = self._call_id
await self._ws.send(json.dumps({
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params or [],
}))
while True:
raw = await asyncio.wait_for(self._ws.recv(), timeout=60)
msg = json.loads(raw)
if "id" not in msg:
continue
if msg["id"] != req_id:
continue
if "error" in msg:
err = msg["error"]
reason = (
err.get("data", {}).get("reason")
or err.get("message")
or repr(err)
)
raise RuntimeError(f"API error [{method}]: {reason}")
return msg.get("result")
# ─────────────────────────────────────────────────────────────────────────────
# Dataset utilities
# ─────────────────────────────────────────────────────────────────────────────
async def check_dataset_paths(
client: TrueNASClient,
paths: list[str],
) -> list[str]:
"""
Return the subset of *paths* that have no matching ZFS dataset on the
destination. Returns an empty list when the dataset query itself fails.
"""
if not paths:
return []
unique = sorted({p.rstrip("/") for p in paths if p})
log.info("Checking %d share path(s) against destination datasets …", len(unique))
try:
datasets = await client.call("pool.dataset.query") or []
except RuntimeError as exc:
log.warning("Could not query datasets (skipping check): %s", exc)
return []
mountpoints = {
d.get("mountpoint", "").rstrip("/")
for d in datasets
if d.get("mountpoint")
}
missing = [p for p in unique if p not in mountpoints]
if missing:
for p in missing:
log.warning(" MISSING dataset for path: %s", p)
else:
log.info(" All share paths exist as datasets.")
return missing
async def create_dataset(client: TrueNASClient, path: str) -> bool:
"""
Create a ZFS dataset whose mountpoint will be *path*.
*path* must be an absolute /mnt/… path.
Returns True on success, False on failure.
"""
if not path.startswith("/mnt/"):
log.error("Cannot auto-create dataset for non-/mnt/ path: %s", path)
return False
name = path[5:].rstrip("/")
log.info("Creating dataset %r", name)
try:
await client.call("pool.dataset.create", [{"name": name}])
log.info(" Created: %s", name)
return True
except RuntimeError as exc:
log.error(" Failed to create dataset %r: %s", name, exc)
return False
async def create_missing_datasets(
host: str,
port: int,
api_key: str,
paths: list[str],
verify_ssl: bool = False,
) -> None:
"""Open a fresh connection and create ZFS datasets for *paths*."""
async with TrueNASClient(
host=host, port=port, api_key=api_key, verify_ssl=verify_ssl,
) as client:
for path in paths:
await create_dataset(client, path)

55
truenas_migrate/colors.py Normal file
View File

@@ -0,0 +1,55 @@
"""ANSI color helpers and shared logger."""
from __future__ import annotations
import logging
import re as _re
import sys
_USE_COLOR = sys.stderr.isatty()
def _c(code: str, text: str) -> str:
return f"\033[{code}m{text}\033[0m" if _USE_COLOR else text
def _dim(t: str) -> str: return _c("2", t)
def _bold(t: str) -> str: return _c("1", t)
def _red(t: str) -> str: return _c("31", t)
def _green(t: str) -> str: return _c("32", t)
def _yellow(t: str) -> str: return _c("33", t)
def _cyan(t: str) -> str: return _c("36", t)
def _bold_red(t: str) -> str: return _c("1;31", t)
def _bold_green(t: str) -> str: return _c("1;32", t)
def _bold_yellow(t: str) -> str: return _c("1;33", t)
def _bold_cyan(t: str) -> str: return _c("1;36", t)
def _vis_len(s: str) -> int:
"""Visible character width of a string, ignoring ANSI escape sequences."""
return len(_re.sub(r'\033\[[0-9;]*m', '', s))
class _ColorFormatter(logging.Formatter):
_STYLES = {
logging.DEBUG: "2",
logging.INFO: "36",
logging.WARNING: "1;33",
logging.ERROR: "1;31",
logging.CRITICAL: "1;31",
}
def format(self, record: logging.LogRecord) -> str:
ts = self.formatTime(record, self.datefmt)
msg = record.getMessage()
if _USE_COLOR:
code = self._STYLES.get(record.levelno, "0")
level = f"\033[{code}m{record.levelname:<8}\033[0m"
ts = f"\033[2m{ts}\033[0m"
else:
level = f"{record.levelname:<8}"
return f"{ts} {level} {msg}"
_handler = logging.StreamHandler()
_handler.setFormatter(_ColorFormatter(datefmt="%H:%M:%S"))
logging.basicConfig(level=logging.INFO, handlers=[_handler])
log = logging.getLogger("truenas_migrate")

154
truenas_migrate/migrate.py Normal file
View File

@@ -0,0 +1,154 @@
"""Migration routines for SMB and NFS shares."""
from __future__ import annotations
import json
from typing import Any
from .colors import log, _bold, _bold_cyan, _bold_green, _bold_red, _cyan, _yellow
from .client import TrueNASClient
from .summary import Summary
# ─────────────────────────────────────────────────────────────────────────────
# Payload builders
# ─────────────────────────────────────────────────────────────────────────────
# Read-only / server-generated fields that must NOT be sent on create/update
_SMB_SHARE_READONLY = frozenset({"id", "locked"})
# CORE SMB share fields that do not exist in the SCALE API
_SMB_SHARE_CORE_EXTRAS = frozenset({
"vuid", # server-generated Time Machine UUID; SCALE sets this automatically
})
# CORE NFS share fields that do not exist in the SCALE API
_NFS_SHARE_CORE_EXTRAS = frozenset({
"paths", # CORE uses a list; SCALE uses a single "path" string (converted below)
"alldirs", # removed in SCALE
"quiet", # removed in SCALE
})
def _smb_share_payload(share: dict) -> dict:
exclude = _SMB_SHARE_READONLY | _SMB_SHARE_CORE_EXTRAS
return {k: v for k, v in share.items() if k not in exclude}
def _nfs_share_payload(share: dict) -> dict:
payload = {k: v for k, v in share.items()
if k not in {"id", "locked"} | _NFS_SHARE_CORE_EXTRAS}
# CORE stores export paths as a list under "paths"; SCALE expects a single "path" string.
if "path" not in payload and share.get("paths"):
payload["path"] = share["paths"][0]
return payload
# ─────────────────────────────────────────────────────────────────────────────
# Migration routines
# ─────────────────────────────────────────────────────────────────────────────
async def migrate_smb_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.smb_found = len(shares)
if not shares:
log.info("No SMB shares found in archive.")
return
log.info("Querying existing SMB shares on destination …")
try:
existing = await client.call("sharing.smb.query") or []
except RuntimeError as exc:
msg = f"Could not query SMB shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_names = {s.get("name", "").lower() for s in existing}
log.info(" Destination has %d existing SMB share(s).", len(existing_names))
for share in shares:
name = share.get("name", "<unnamed>")
log.info("%s SMB share %s", _bold("──"), _bold_cyan(repr(name)))
if name.lower() in existing_names:
log.info(" %s already exists on destination.", _yellow("SKIP"))
summary.smb_skipped += 1
continue
payload = _smb_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" %s would create %s%s",
_cyan("[DRY RUN]"), _bold_cyan(repr(name)), payload.get("path"))
summary.smb_created += 1
if payload.get("path"):
summary.paths_to_create.append(payload["path"])
continue
try:
r = await client.call("sharing.smb.create", [payload])
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.smb_created += 1
except RuntimeError as exc:
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.smb_failed += 1
summary.errors.append(f"SMB share {name!r}: {exc}")
async def migrate_nfs_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.nfs_found = len(shares)
if not shares:
log.info("No NFS shares found in archive.")
return
log.info("Querying existing NFS shares on destination …")
try:
existing = await client.call("sharing.nfs.query") or []
except RuntimeError as exc:
msg = f"Could not query NFS shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_paths = {s.get("path", "").rstrip("/") for s in existing}
log.info(" Destination has %d existing NFS share(s).", len(existing_paths))
for share in shares:
core_paths = share.get("paths") or []
path = (share.get("path") or (core_paths[0] if core_paths else "")).rstrip("/")
all_paths = [p.rstrip("/") for p in (core_paths if core_paths else ([path] if path else []))]
log.info("%s NFS export %s", _bold("──"), _bold_cyan(repr(path)))
if path in existing_paths:
log.info(" %s path already exported on destination.", _yellow("SKIP"))
summary.nfs_skipped += 1
continue
payload = _nfs_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" %s would create NFS export for %s",
_cyan("[DRY RUN]"), _bold_cyan(repr(path)))
summary.nfs_created += 1
summary.paths_to_create.extend(all_paths)
continue
try:
r = await client.call("sharing.nfs.create", [payload])
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.nfs_created += 1
except RuntimeError as exc:
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.nfs_failed += 1
summary.errors.append(f"NFS share {path!r}: {exc}")

View File

@@ -0,0 +1,93 @@
"""Migration summary dataclass and report renderer."""
from __future__ import annotations
from dataclasses import dataclass, field
from .colors import (
_dim, _bold, _red, _yellow, _cyan,
_bold_red, _bold_green, _bold_yellow, _vis_len,
)
@dataclass
class Summary:
smb_found: int = 0
smb_created: int = 0
smb_skipped: int = 0
smb_failed: int = 0
nfs_found: int = 0
nfs_created: int = 0
nfs_skipped: int = 0
nfs_failed: int = 0
errors: list[str] = field(default_factory=list)
# Populated during dry-run dataset safety checks
paths_to_create: list[str] = field(default_factory=list)
missing_datasets: list[str] = field(default_factory=list)
def report(self) -> str:
w = 60
def _stat(label: str, n: int, color_fn) -> str:
s = f"{label}={n}"
return color_fn(s) if n > 0 else _dim(s)
smb_val = (
f"{_dim('found=' + str(self.smb_found))} "
f"{_stat('created', self.smb_created, _bold_green)} "
f"{_stat('skipped', self.smb_skipped, _yellow)} "
f"{_stat('failed', self.smb_failed, _bold_red)}"
)
nfs_val = (
f"{_dim('found=' + str(self.nfs_found))} "
f"{_stat('created', self.nfs_created, _bold_green)} "
f"{_stat('skipped', self.nfs_skipped, _yellow)} "
f"{_stat('failed', self.nfs_failed, _bold_red)}"
)
hr = _cyan("" * w)
tl = _cyan(""); tr = _cyan("")
ml = _cyan(""); mr = _cyan("")
bl = _cyan(""); br = _cyan("")
side = _cyan("")
title_text = "MIGRATION SUMMARY"
lpad = (w - len(title_text)) // 2
rpad = w - len(title_text) - lpad
title_row = f"{side}{' ' * lpad}{_bold(title_text)}{' ' * rpad}{side}"
def row(label: str, val: str) -> str:
right = max(0, w - 2 - len(label) - _vis_len(val))
return f"{side} {_dim(label)}{val}{' ' * right} {side}"
lines = [
"",
f"{tl}{hr}{tr}",
title_row,
f"{ml}{hr}{mr}",
row("SMB shares : ", smb_val),
row("NFS shares : ", nfs_val),
f"{bl}{hr}{br}",
]
if self.errors:
lines.append(f"\n {_bold_red(str(len(self.errors)) + ' error(s):')} ")
for e in self.errors:
lines.append(f" {_red('')} {e}")
if self.missing_datasets:
lines.append(
f"\n {_bold_yellow('WARNING:')} "
f"{len(self.missing_datasets)} share path(s) have no "
"matching dataset on the destination:"
)
for p in self.missing_datasets:
lines.append(f" {_yellow('')} {p}")
lines.append(
" These paths must exist before shares can be created.\n"
" Use interactive mode or answer 'y' at the dataset prompt to create them."
)
lines.append("")
return "\n".join(lines)