Files
TrueMigration/truenas_migrate/migrate.py
scott c157e14fa9 Restructure into package: truenas_migrate/
Split single-file script into focused modules:
  colors.py   – ANSI helpers and shared logger
  summary.py  – Summary dataclass and report renderer
  archive.py  – Debug archive parser (SCALE + CORE layouts)
  client.py   – WebSocket engine, TrueNASClient, dataset utilities
  migrate.py  – Payload builders, migrate_smb_shares, migrate_nfs_shares
  cli.py      – Interactive wizard, argparse, run(), main()
  __main__.py – python -m truenas_migrate entry point

truenas_migrate.py retained as a one-line compatibility shim.
Both 'python truenas_migrate.py' and 'python -m truenas_migrate' work.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 21:50:00 -05:00

155 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Migration routines for SMB and NFS shares."""
from __future__ import annotations
import json
from typing import Any
from .colors import log, _bold, _bold_cyan, _bold_green, _bold_red, _cyan, _yellow
from .client import TrueNASClient
from .summary import Summary
# ─────────────────────────────────────────────────────────────────────────────
# Payload builders
# ─────────────────────────────────────────────────────────────────────────────
# Read-only / server-generated fields that must NOT be sent on create/update
_SMB_SHARE_READONLY = frozenset({"id", "locked"})
# CORE SMB share fields that do not exist in the SCALE API
_SMB_SHARE_CORE_EXTRAS = frozenset({
"vuid", # server-generated Time Machine UUID; SCALE sets this automatically
})
# CORE NFS share fields that do not exist in the SCALE API
_NFS_SHARE_CORE_EXTRAS = frozenset({
"paths", # CORE uses a list; SCALE uses a single "path" string (converted below)
"alldirs", # removed in SCALE
"quiet", # removed in SCALE
})
def _smb_share_payload(share: dict) -> dict:
exclude = _SMB_SHARE_READONLY | _SMB_SHARE_CORE_EXTRAS
return {k: v for k, v in share.items() if k not in exclude}
def _nfs_share_payload(share: dict) -> dict:
payload = {k: v for k, v in share.items()
if k not in {"id", "locked"} | _NFS_SHARE_CORE_EXTRAS}
# CORE stores export paths as a list under "paths"; SCALE expects a single "path" string.
if "path" not in payload and share.get("paths"):
payload["path"] = share["paths"][0]
return payload
# ─────────────────────────────────────────────────────────────────────────────
# Migration routines
# ─────────────────────────────────────────────────────────────────────────────
async def migrate_smb_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.smb_found = len(shares)
if not shares:
log.info("No SMB shares found in archive.")
return
log.info("Querying existing SMB shares on destination …")
try:
existing = await client.call("sharing.smb.query") or []
except RuntimeError as exc:
msg = f"Could not query SMB shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_names = {s.get("name", "").lower() for s in existing}
log.info(" Destination has %d existing SMB share(s).", len(existing_names))
for share in shares:
name = share.get("name", "<unnamed>")
log.info("%s SMB share %s", _bold("──"), _bold_cyan(repr(name)))
if name.lower() in existing_names:
log.info(" %s already exists on destination.", _yellow("SKIP"))
summary.smb_skipped += 1
continue
payload = _smb_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" %s would create %s%s",
_cyan("[DRY RUN]"), _bold_cyan(repr(name)), payload.get("path"))
summary.smb_created += 1
if payload.get("path"):
summary.paths_to_create.append(payload["path"])
continue
try:
r = await client.call("sharing.smb.create", [payload])
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.smb_created += 1
except RuntimeError as exc:
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.smb_failed += 1
summary.errors.append(f"SMB share {name!r}: {exc}")
async def migrate_nfs_shares(
client: TrueNASClient,
shares: list[dict],
dry_run: bool,
summary: Summary,
) -> None:
summary.nfs_found = len(shares)
if not shares:
log.info("No NFS shares found in archive.")
return
log.info("Querying existing NFS shares on destination …")
try:
existing = await client.call("sharing.nfs.query") or []
except RuntimeError as exc:
msg = f"Could not query NFS shares: {exc}"
log.error(msg)
summary.errors.append(msg)
return
existing_paths = {s.get("path", "").rstrip("/") for s in existing}
log.info(" Destination has %d existing NFS share(s).", len(existing_paths))
for share in shares:
core_paths = share.get("paths") or []
path = (share.get("path") or (core_paths[0] if core_paths else "")).rstrip("/")
all_paths = [p.rstrip("/") for p in (core_paths if core_paths else ([path] if path else []))]
log.info("%s NFS export %s", _bold("──"), _bold_cyan(repr(path)))
if path in existing_paths:
log.info(" %s path already exported on destination.", _yellow("SKIP"))
summary.nfs_skipped += 1
continue
payload = _nfs_share_payload(share)
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" %s would create NFS export for %s",
_cyan("[DRY RUN]"), _bold_cyan(repr(path)))
summary.nfs_created += 1
summary.paths_to_create.extend(all_paths)
continue
try:
r = await client.call("sharing.nfs.create", [payload])
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.nfs_created += 1
except RuntimeError as exc:
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.nfs_failed += 1
summary.errors.append(f"NFS share {path!r}: {exc}")