Restructure into package: truenas_migrate/

Split single-file script into focused modules:
  colors.py   – ANSI helpers and shared logger
  summary.py  – Summary dataclass and report renderer
  archive.py  – Debug archive parser (SCALE + CORE layouts)
  client.py   – WebSocket engine, TrueNASClient, dataset utilities
  migrate.py  – Payload builders, migrate_smb_shares, migrate_nfs_shares
  cli.py      – Interactive wizard, argparse, run(), main()
  __main__.py – python -m truenas_migrate entry point

truenas_migrate.py retained as a one-line compatibility shim.
Both 'python truenas_migrate.py' and 'python -m truenas_migrate' work.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 21:50:00 -05:00
parent 543ca6b471
commit c157e14fa9
17 changed files with 1393 additions and 1392 deletions

352
truenas_migrate/archive.py Normal file
View File

@@ -0,0 +1,352 @@
"""TrueNAS debug archive parser (SCALE ixdiagnose and CORE fndebug layouts)."""
from __future__ import annotations
import contextlib
import json
import sys
import tarfile
from typing import Any, Optional
from .colors import log
# ─────────────────────────────────────────────────────────────────────────────
# Archive layout constants
# ─────────────────────────────────────────────────────────────────────────────
#
# TrueNAS SCALE generates debug archives with the "ixdiagnose" tool.
# The internal layout has changed across versions:
#
# SCALE 24.04+ (plugins layout, lowercase dirs, combined JSON files)
# ixdiagnose/plugins/smb/smb_info.json SMB shares + config combined
# ixdiagnose/plugins/nfs/nfs_config.json NFS shares + config combined
#
# Older SCALE (plugins layout, uppercase dirs, per-query JSON files)
# ixdiagnose/plugins/SMB/sharing.smb.query.json
# ixdiagnose/plugins/NFS/sharing.nfs.query.json
# ixdiagnose/plugins/Sharing/sharing.smb.query.json
# ixdiagnose/plugins/Sharing/sharing.nfs.query.json
#
# TrueNAS CORE uses the "freenas-debug" tool (stored as "fndebug" inside the
# archive). It produces plain-text dump files with embedded JSON blocks.
_CANDIDATES: dict[str, list[str]] = {
"smb_shares": [
"ixdiagnose/plugins/smb/smb_info.json",
"ixdiagnose/plugins/SMB/sharing.smb.query.json",
"ixdiagnose/plugins/Sharing/sharing.smb.query.json",
"ixdiagnose/SMB/sharing.smb.query.json",
],
"nfs_shares": [
"ixdiagnose/plugins/nfs/nfs_config.json",
"ixdiagnose/plugins/NFS/sharing.nfs.query.json",
"ixdiagnose/plugins/Sharing/sharing.nfs.query.json",
"ixdiagnose/NFS/sharing.nfs.query.json",
],
}
# When a candidate file bundles multiple datasets, pull out the right sub-key.
_KEY_WITHIN_FILE: dict[str, str] = {
"smb_shares": "sharing_smb_query",
"nfs_shares": "sharing_nfs_query",
}
# Keyword fragments for heuristic fallback scan (SCALE archives only)
_KEYWORDS: dict[str, list[str]] = {
"smb_shares": ["sharing.smb", "smb_share", "sharing/smb", "smb_info"],
"nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs", "nfs_config"],
}
# Presence of this path prefix identifies a TrueNAS CORE archive.
_CORE_MARKER = "ixdiagnose/fndebug"
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]:
"""Return {normalised_path: TarInfo} for every member."""
return {m.name.lstrip("./"): m for m in tf.getmembers()}
def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]:
"""Extract and JSON-parse one archive member. Returns None on any error."""
try:
fh = tf.extractfile(info)
if fh is None:
return None
raw = fh.read().decode("utf-8", errors="replace").strip()
return json.loads(raw) if raw else None
except Exception as exc:
log.debug("Could not parse %s: %s", info.name, exc)
return None
def _extract_subkey(raw: Any, data_type: str) -> Optional[Any]:
"""Pull out the relevant sub-key when a JSON file bundles multiple datasets."""
if not isinstance(raw, dict):
return raw
key = _KEY_WITHIN_FILE.get(data_type)
if key and key in raw:
return raw[key]
return raw
def _find_data(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
data_type: str,
) -> Optional[Any]:
"""Try candidate paths, then keyword heuristics. Return parsed JSON or None."""
# Pass 1 exact / suffix match against known candidate paths
for candidate in _CANDIDATES[data_type]:
norm = candidate.lstrip("./")
info = members.get(norm)
if info is None:
# Archive may have a date-stamped top-level directory
for path, member in members.items():
if path == norm or path.endswith("/" + norm):
info = member
break
if info is not None:
raw = _read_json(tf, info)
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s", data_type, info.name)
return result
# Pass 2 keyword heuristic scan over all .json members
log.debug(" %s: candidates missed, scanning archive …", data_type)
keywords = _KEYWORDS[data_type]
for path in sorted(members):
if not path.lower().endswith(".json"):
continue
if any(kw in path.lower() for kw in keywords):
raw = _read_json(tf, members[path])
result = _extract_subkey(raw, data_type)
if result is not None:
log.info(" %-12s%s (heuristic)", data_type, path)
return result
return None
def _extract_core_dump_json(dump_text: str, title_fragment: str) -> list[Any]:
"""
Extract all top-level JSON values from a named section of a CORE dump.txt.
CORE dump sections look like:
+--------...--------+
+ SECTION TITLE +
+--------...--------+
<content>
debug finished in N seconds for SECTION TITLE
Returns a list of parsed JSON values found in the content block, in order.
"""
import re as _re
parts = _re.split(r'\+[-]{20,}\+', dump_text)
for i, part in enumerate(parts):
if title_fragment.lower() in part.lower() and i + 1 < len(parts):
content = parts[i + 1]
content = _re.sub(
r'debug finished.*', '', content,
flags=_re.IGNORECASE | _re.DOTALL,
).strip()
results: list[Any] = []
decoder = json.JSONDecoder()
pos = 0
while pos < len(content):
remaining = content[pos:].lstrip()
if not remaining or remaining[0] not in "{[":
break
pos += len(content[pos:]) - len(remaining)
try:
val, end = decoder.raw_decode(remaining)
results.append(val)
pos += end
except json.JSONDecodeError:
break
return results
return []
def _parse_core_into(
tf: tarfile.TarFile,
members: dict[str, tarfile.TarInfo],
result: dict[str, Any],
) -> None:
"""Populate *result* from TrueNAS CORE fndebug dump files."""
log.info("TrueNAS CORE archive detected; parsing fndebug dump files.")
smb_key = "ixdiagnose/fndebug/SMB/dump.txt"
if smb_key in members:
fh = tf.extractfile(members[smb_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Database Dump")
if len(vals) >= 2 and isinstance(vals[1], list):
result["smb_shares"] = vals[1]
log.info(" smb_shares → %s (CORE, %d share(s))", smb_key, len(vals[1]))
elif vals:
log.warning(" smb_shares → NOT FOUND in Database Dump")
else:
log.warning(" SMB dump not found: %s", smb_key)
nfs_key = "ixdiagnose/fndebug/NFS/dump.txt"
if nfs_key in members:
fh = tf.extractfile(members[nfs_key])
dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr]
vals = _extract_core_dump_json(dump, "Configuration")
if len(vals) >= 2 and isinstance(vals[1], list):
result["nfs_shares"] = vals[1]
log.info(" nfs_shares → %s (CORE, %d share(s))", nfs_key, len(vals[1]))
else:
log.warning(" nfs_shares → NOT FOUND in Configuration")
else:
log.warning(" NFS dump not found: %s", nfs_key)
if not result["smb_shares"] and not result["nfs_shares"]:
log.warning(
"No share data found in CORE archive. "
"This is expected when SMB/NFS services were disabled on the source system."
)
@contextlib.contextmanager
def _open_source_tar(tar_path: str):
"""
Open the archive that actually contains the ixdiagnose data.
TrueNAS HA debug bundles (25.04+) wrap each node's ixdiagnose snapshot
in a separate .txz inside the outer .tgz. We prefer the member whose
name includes '_active'; if none is labelled that way we fall back to the
first .txz found. Single-node (non-HA) bundles are used directly.
"""
with tarfile.open(tar_path, "r:*") as outer:
txz_members = [
m for m in outer.getmembers()
if m.name.lower().endswith(".txz") and m.isfile()
]
if not txz_members:
yield outer
return
active = next(
(m for m in txz_members if "_active" in m.name.lower()),
txz_members[0],
)
log.info(" HA bundle detected; reading inner archive: %s", active.name)
fh = outer.extractfile(active)
with tarfile.open(fileobj=fh, mode="r:*") as inner:
yield inner
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def parse_archive(tar_path: str) -> dict[str, Any]:
"""
Extract SMB shares and NFS shares from the debug archive.
Returns: {"smb_shares": list, "nfs_shares": list}
"""
log.info("Opening archive: %s", tar_path)
result: dict[str, Any] = {
"smb_shares": [],
"nfs_shares": [],
}
try:
with _open_source_tar(tar_path) as tf:
members = _members_map(tf)
log.info(" Archive contains %d total entries.", len(members))
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members
)
if is_core:
_parse_core_into(tf, members, result)
else:
for key in ("smb_shares", "nfs_shares"):
data = _find_data(tf, members, key)
if data is None:
log.warning(" %-12s → NOT FOUND", key)
continue
if isinstance(data, list):
result[key] = data
elif isinstance(data, dict):
# Some versions wrap the list: {"result": [...]}
for v in data.values():
if isinstance(v, list):
result[key] = v
break
except (tarfile.TarError, OSError) as exc:
log.error("Failed to open archive: %s", exc)
sys.exit(1)
log.info(
"Parsed: %d SMB share(s), %d NFS share(s)",
len(result["smb_shares"]),
len(result["nfs_shares"]),
)
return result
def list_archive_and_exit(tar_path: str) -> None:
"""
Print a structured listing of the archive contents, then exit.
For SCALE archives: lists all .json plugin files.
For CORE archives: lists the fndebug dump files and the JSON sections
that contain share data.
"""
try:
with _open_source_tar(tar_path) as tf:
members_map = _members_map(tf)
is_core = any(
p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/")
for p in members_map
)
if is_core:
print(f"\nTrueNAS CORE archive: {tar_path}\n")
print(" fndebug plain-text dump files (JSON is embedded inside):\n")
dump_files = sorted(
p for p in members_map
if p.startswith(_CORE_MARKER + "/") and p.endswith(".txt")
)
for p in dump_files:
size = members_map[p].size / 1024
print(f" {p} ({size:.1f} KB)")
print()
print(" Data this tool will extract:")
print(" SMB shares → fndebug/SMB/dump.txt (\"Database Dump\" section)")
print(" NFS shares → fndebug/NFS/dump.txt (\"Configuration\" section)")
else:
print(f"\nJSON plugin files in archive: {tar_path}\n")
json_members = sorted(
(m for m in tf.getmembers() if m.name.endswith(".json")),
key=lambda m: m.name,
)
if not json_members:
print(" (no .json files found)")
else:
current_dir = ""
for m in json_members:
parts = m.name.lstrip("./").split("/")
top = "/".join(parts[:-1]) if len(parts) > 1 else ""
if top != current_dir:
print(f"\n {top or '(root)'}/")
current_dir = top
print(f" {parts[-1]} ({m.size / 1024:.1f} KB)")
except (tarfile.TarError, OSError) as exc:
sys.exit(f"ERROR: {exc}")
print()
sys.exit(0)