"""TrueNAS debug archive parser (SCALE ixdiagnose and CORE fndebug layouts).""" from __future__ import annotations import contextlib import json import sys import tarfile from typing import Any, Optional from .colors import log # ───────────────────────────────────────────────────────────────────────────── # Archive layout constants # ───────────────────────────────────────────────────────────────────────────── # # TrueNAS SCALE generates debug archives with the "ixdiagnose" tool. # The internal layout has changed across versions: # # SCALE 24.04+ (plugins layout, lowercase dirs, combined JSON files) # ixdiagnose/plugins/smb/smb_info.json – SMB shares + config combined # ixdiagnose/plugins/nfs/nfs_config.json – NFS shares + config combined # # Older SCALE (plugins layout, uppercase dirs, per-query JSON files) # ixdiagnose/plugins/SMB/sharing.smb.query.json # ixdiagnose/plugins/NFS/sharing.nfs.query.json # ixdiagnose/plugins/Sharing/sharing.smb.query.json # ixdiagnose/plugins/Sharing/sharing.nfs.query.json # # TrueNAS CORE uses the "freenas-debug" tool (stored as "fndebug" inside the # archive). It produces plain-text dump files with embedded JSON blocks. _CANDIDATES: dict[str, list[str]] = { "smb_shares": [ "ixdiagnose/plugins/smb/smb_info.json", "ixdiagnose/plugins/SMB/sharing.smb.query.json", "ixdiagnose/plugins/Sharing/sharing.smb.query.json", "ixdiagnose/SMB/sharing.smb.query.json", ], "nfs_shares": [ "ixdiagnose/plugins/nfs/nfs_config.json", "ixdiagnose/plugins/NFS/sharing.nfs.query.json", "ixdiagnose/plugins/Sharing/sharing.nfs.query.json", "ixdiagnose/NFS/sharing.nfs.query.json", ], "iscsi": [ "ixdiagnose/plugins/iscsi/iscsi_config.json", "ixdiagnose/plugins/ISCSI/iscsi_config.json", ], } # When a candidate file bundles multiple datasets, pull out the right sub-key. _KEY_WITHIN_FILE: dict[str, str] = { "smb_shares": "sharing_smb_query", "nfs_shares": "sharing_nfs_query", # "iscsi" intentionally omitted — iscsi_config.json is used as-is } # Keyword fragments for heuristic fallback scan (SCALE archives only) _KEYWORDS: dict[str, list[str]] = { "smb_shares": ["sharing.smb", "smb_share", "sharing/smb", "smb_info"], "nfs_shares": ["sharing.nfs", "nfs_share", "sharing/nfs", "nfs_config"], "iscsi": ["iscsi_config", "iscsi/iscsi"], } # Presence of this path prefix identifies a TrueNAS CORE archive. _CORE_MARKER = "ixdiagnose/fndebug" # ───────────────────────────────────────────────────────────────────────────── # Internal helpers # ───────────────────────────────────────────────────────────────────────────── def _members_map(tf: tarfile.TarFile) -> dict[str, tarfile.TarInfo]: """Return {normalised_path: TarInfo} for every member.""" return {m.name.lstrip("./"): m for m in tf.getmembers()} def _read_json(tf: tarfile.TarFile, info: tarfile.TarInfo) -> Optional[Any]: """Extract and JSON-parse one archive member. Returns None on any error.""" try: fh = tf.extractfile(info) if fh is None: return None raw = fh.read().decode("utf-8", errors="replace").strip() return json.loads(raw) if raw else None except Exception as exc: log.debug("Could not parse %s: %s", info.name, exc) return None def _extract_subkey(raw: Any, data_type: str) -> Optional[Any]: """Pull out the relevant sub-key when a JSON file bundles multiple datasets.""" if not isinstance(raw, dict): return raw key = _KEY_WITHIN_FILE.get(data_type) if key and key in raw: return raw[key] return raw def _find_data( tf: tarfile.TarFile, members: dict[str, tarfile.TarInfo], data_type: str, ) -> Optional[Any]: """Try candidate paths, then keyword heuristics. Return parsed JSON or None.""" # Pass 1 – exact / suffix match against known candidate paths for candidate in _CANDIDATES[data_type]: norm = candidate.lstrip("./") info = members.get(norm) if info is None: # Archive may have a date-stamped top-level directory for path, member in members.items(): if path == norm or path.endswith("/" + norm): info = member break if info is not None: raw = _read_json(tf, info) result = _extract_subkey(raw, data_type) if result is not None: log.info(" %-12s → %s", data_type, info.name) return result # Pass 2 – keyword heuristic scan over all .json members log.debug(" %s: candidates missed, scanning archive …", data_type) keywords = _KEYWORDS[data_type] for path in sorted(members): if not path.lower().endswith(".json"): continue if any(kw in path.lower() for kw in keywords): raw = _read_json(tf, members[path]) result = _extract_subkey(raw, data_type) if result is not None: log.info(" %-12s → %s (heuristic)", data_type, path) return result return None def _extract_core_dump_json(dump_text: str, title_fragment: str) -> list[Any]: """ Extract all top-level JSON values from a named section of a CORE dump.txt. CORE dump sections look like: +--------...--------+ + SECTION TITLE + +--------...--------+ debug finished in N seconds for SECTION TITLE Returns a list of parsed JSON values found in the content block, in order. """ import re as _re parts = _re.split(r'\+[-]{20,}\+', dump_text) for i, part in enumerate(parts): if title_fragment.lower() in part.lower() and i + 1 < len(parts): content = parts[i + 1] content = _re.sub( r'debug finished.*', '', content, flags=_re.IGNORECASE | _re.DOTALL, ).strip() results: list[Any] = [] decoder = json.JSONDecoder() pos = 0 while pos < len(content): remaining = content[pos:].lstrip() if not remaining or remaining[0] not in "{[": break pos += len(content[pos:]) - len(remaining) try: val, end = decoder.raw_decode(remaining) results.append(val) pos += end except json.JSONDecodeError: break return results return [] def _parse_core_into( tf: tarfile.TarFile, members: dict[str, tarfile.TarInfo], result: dict[str, Any], ) -> None: """Populate *result* from TrueNAS CORE fndebug dump files.""" log.info("TrueNAS CORE archive detected; parsing fndebug dump files.") smb_key = "ixdiagnose/fndebug/SMB/dump.txt" if smb_key in members: fh = tf.extractfile(members[smb_key]) dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr] vals = _extract_core_dump_json(dump, "Database Dump") if len(vals) >= 2 and isinstance(vals[1], list): result["smb_shares"] = vals[1] log.info(" smb_shares → %s (CORE, %d share(s))", smb_key, len(vals[1])) elif vals: log.warning(" smb_shares → NOT FOUND in Database Dump") else: log.warning(" SMB dump not found: %s", smb_key) nfs_key = "ixdiagnose/fndebug/NFS/dump.txt" if nfs_key in members: fh = tf.extractfile(members[nfs_key]) dump = fh.read().decode("utf-8", errors="replace") # type: ignore[union-attr] vals = _extract_core_dump_json(dump, "Configuration") if len(vals) >= 2 and isinstance(vals[1], list): result["nfs_shares"] = vals[1] log.info(" nfs_shares → %s (CORE, %d share(s))", nfs_key, len(vals[1])) else: log.warning(" nfs_shares → NOT FOUND in Configuration") else: log.warning(" NFS dump not found: %s", nfs_key) if not result["smb_shares"] and not result["nfs_shares"]: log.warning( "No share data found in CORE archive. " "This is expected when SMB/NFS services were disabled on the source system." ) @contextlib.contextmanager def _open_source_tar(tar_path: str): """ Open the archive that actually contains the ixdiagnose data. TrueNAS HA debug bundles (25.04+) wrap each node's ixdiagnose snapshot in a separate .txz inside the outer .tgz. We prefer the member whose name includes '_active'; if none is labelled that way we fall back to the first .txz found. Single-node (non-HA) bundles are used directly. """ with tarfile.open(tar_path, "r:*") as outer: txz_members = [ m for m in outer.getmembers() if m.name.lower().endswith(".txz") and m.isfile() ] if not txz_members: yield outer return active = next( (m for m in txz_members if "_active" in m.name.lower()), txz_members[0], ) log.info(" HA bundle detected; reading inner archive: %s", active.name) fh = outer.extractfile(active) with tarfile.open(fileobj=fh, mode="r:*") as inner: yield inner # ───────────────────────────────────────────────────────────────────────────── # Public API # ───────────────────────────────────────────────────────────────────────────── def parse_archive(tar_path: str) -> dict[str, Any]: """ Extract SMB shares, NFS shares, and iSCSI configuration from the debug archive. Returns: {"smb_shares": list, "nfs_shares": list, "iscsi": dict} """ log.info("Opening archive: %s", tar_path) result: dict[str, Any] = { "smb_shares": [], "nfs_shares": [], "iscsi": {}, } try: with _open_source_tar(tar_path) as tf: members = _members_map(tf) log.info(" Archive contains %d total entries.", len(members)) is_core = any( p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/") for p in members ) if is_core: _parse_core_into(tf, members, result) else: for key in ("smb_shares", "nfs_shares"): data = _find_data(tf, members, key) if data is None: log.warning(" %-12s → NOT FOUND", key) continue if isinstance(data, list): result[key] = data elif isinstance(data, dict): # Some versions wrap the list: {"result": [...]} for v in data.values(): if isinstance(v, list): result[key] = v break # iSCSI — combined dict file, not a bare list iscsi_raw = _find_data(tf, members, "iscsi") if iscsi_raw and isinstance(iscsi_raw, dict): result["iscsi"] = { "global_config": iscsi_raw.get("global_config", {}), "portals": iscsi_raw.get("portals", []), "initiators": iscsi_raw.get("initiators", []), "targets": iscsi_raw.get("targets", []), "extents": iscsi_raw.get("extents", []), "targetextents": iscsi_raw.get("targetextents", []), } elif iscsi_raw is not None: log.warning(" iscsi → unexpected format (expected dict)") except (tarfile.TarError, OSError) as exc: log.error("Failed to open archive: %s", exc) sys.exit(1) iscsi = result["iscsi"] log.info( "Parsed: %d SMB share(s), %d NFS share(s), " "iSCSI: %d target(s) / %d extent(s) / %d portal(s)", len(result["smb_shares"]), len(result["nfs_shares"]), len(iscsi.get("targets", [])), len(iscsi.get("extents", [])), len(iscsi.get("portals", [])), ) return result def list_archive_and_exit(tar_path: str) -> None: """ Print a structured listing of the archive contents, then exit. For SCALE archives: lists all .json plugin files. For CORE archives: lists the fndebug dump files and the JSON sections that contain share data. """ try: with _open_source_tar(tar_path) as tf: members_map = _members_map(tf) is_core = any( p == _CORE_MARKER or p.startswith(_CORE_MARKER + "/") for p in members_map ) if is_core: print(f"\nTrueNAS CORE archive: {tar_path}\n") print(" fndebug plain-text dump files (JSON is embedded inside):\n") dump_files = sorted( p for p in members_map if p.startswith(_CORE_MARKER + "/") and p.endswith(".txt") ) for p in dump_files: size = members_map[p].size / 1024 print(f" {p} ({size:.1f} KB)") print() print(" Data this tool will extract:") print(" SMB shares → fndebug/SMB/dump.txt (\"Database Dump\" section)") print(" NFS shares → fndebug/NFS/dump.txt (\"Configuration\" section)") else: print(f"\nJSON plugin files in archive: {tar_path}\n") json_members = sorted( (m for m in tf.getmembers() if m.name.endswith(".json")), key=lambda m: m.name, ) if not json_members: print(" (no .json files found)") else: current_dir = "" for m in json_members: parts = m.name.lstrip("./").split("/") top = "/".join(parts[:-1]) if len(parts) > 1 else "" if top != current_dir: print(f"\n {top or '(root)'}/") current_dir = top print(f" {parts[-1]} ({m.size / 1024:.1f} KB)") except (tarfile.TarError, OSError) as exc: sys.exit(f"ERROR: {exc}") print() sys.exit(0)