Add CSV import support for non-TrueNAS sources

- Add truenas_migrate/csv_source.py: parses SMB and NFS share definitions
  from customer-supplied CSV files; returns same dict shape as parse_archive()
  so migrate.py is untouched
- Add smb_shares_template.csv and nfs_shares_template.csv with annotated
  headers and example rows (# comment rows are skipped by the parser)
- Update cli.py: interactive wizard gains a source-type step (archive vs CSV);
  run() resolves CSV source via --smb-csv / --nfs-csv args; --debug-tar is now
  optional; argparse validates mutual exclusion of archive and CSV flags

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 11:27:16 -05:00
parent 880fa42f5b
commit 1f527476e6
4 changed files with 326 additions and 72 deletions

View File

@@ -0,0 +1,160 @@
"""CSV source parser reads SMB/NFS share definitions from customer-supplied CSV files."""
from __future__ import annotations
import csv
import sys
from pathlib import Path
from typing import Any
from .colors import log
# ─────────────────────────────────────────────────────────────────────────────
# Column type metadata
# ─────────────────────────────────────────────────────────────────────────────
# Columns coerced to bool
_SMB_BOOL_COLS = frozenset({"ro", "browsable", "guestok", "abe", "timemachine", "enabled"})
# Columns coerced to list[str] (space-or-comma-separated in CSV)
_SMB_LIST_COLS = frozenset({"hostsallow", "hostsdeny"})
_SMB_REQUIRED = frozenset({"name", "path"})
_NFS_BOOL_COLS = frozenset({"ro", "enabled"})
_NFS_LIST_COLS = frozenset({"security", "hosts", "networks"})
_NFS_REQUIRED = frozenset({"path"})
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _parse_bool(value: str, col: str, row_num: int) -> bool:
v = value.strip().lower()
if v in ("true", "yes", "1"):
return True
if v in ("false", "no", "0", ""):
return False
log.warning(" row %d: unrecognised boolean %r for column %r treating as False",
row_num, value, col)
return False
def _parse_list(value: str) -> list[str]:
"""Split space-or-comma-separated value into a list, dropping blanks."""
return [p for p in value.replace(",", " ").split() if p]
def _coerce_row(
row: dict[str, str],
bool_cols: frozenset[str],
list_cols: frozenset[str],
required: frozenset[str],
row_num: int,
) -> dict[str, Any] | None:
"""Validate and type-coerce one CSV row. Returns None to skip the row."""
if not any((v or "").strip() for v in row.values()):
return None # blank row
first_val = next(iter(row.values()), "") or ""
if first_val.strip().startswith("#"):
return None # comment row
result: dict[str, Any] = {}
for col, raw in row.items():
if col is None:
continue
col = col.strip()
val = (raw or "").strip()
if not val:
continue # omit empty optional fields; API uses its defaults
if col in bool_cols:
result[col] = _parse_bool(val, col, row_num)
elif col in list_cols:
result[col] = _parse_list(val)
else:
result[col] = val
for req in required:
if req not in result:
log.warning(" row %d: missing required field %r skipping row", row_num, req)
return None
return result
def _parse_csv(
csv_path: str,
bool_cols: frozenset[str],
list_cols: frozenset[str],
required: frozenset[str],
label: str,
) -> list[dict]:
path = Path(csv_path)
if not path.is_file():
log.error("%s CSV file not found: %s", label, csv_path)
sys.exit(1)
shares: list[dict] = []
try:
with path.open(newline="", encoding="utf-8-sig") as fh:
reader = csv.DictReader(fh)
if reader.fieldnames is None:
log.error("%s CSV has no header row: %s", label, csv_path)
sys.exit(1)
header = {c.strip() for c in reader.fieldnames if c is not None}
missing_req = required - header
if missing_req:
log.error(
"%s CSV is missing required column(s): %s",
label, ", ".join(sorted(missing_req)),
)
sys.exit(1)
for row_num, row in enumerate(reader, start=2):
normalised = {(k or "").strip(): v for k, v in row.items()}
share = _coerce_row(normalised, bool_cols, list_cols, required, row_num)
if share is not None:
shares.append(share)
except OSError as exc:
log.error("Cannot read %s CSV: %s", label, exc)
sys.exit(1)
log.info(" %-14s%s (%d share(s))", label.lower() + "_shares", csv_path, len(shares))
return shares
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def parse_smb_csv(csv_path: str) -> list[dict]:
"""Parse an SMB shares CSV. Returns share dicts compatible with migrate.py."""
return _parse_csv(csv_path, _SMB_BOOL_COLS, _SMB_LIST_COLS, _SMB_REQUIRED, "SMB")
def parse_nfs_csv(csv_path: str) -> list[dict]:
"""Parse an NFS shares CSV. Returns share dicts compatible with migrate.py."""
return _parse_csv(csv_path, _NFS_BOOL_COLS, _NFS_LIST_COLS, _NFS_REQUIRED, "NFS")
def parse_csv_sources(smb_csv: str | None, nfs_csv: str | None) -> dict[str, Any]:
"""
Parse one or both CSV files.
Returns {"smb_shares": list, "nfs_shares": list} — same shape as parse_archive().
"""
log.info("Loading shares from CSV source(s).")
result: dict[str, Any] = {"smb_shares": [], "nfs_shares": []}
if smb_csv:
result["smb_shares"] = parse_smb_csv(smb_csv)
if nfs_csv:
result["nfs_shares"] = parse_nfs_csv(nfs_csv)
log.info(
"Loaded: %d SMB share(s), %d NFS share(s)",
len(result["smb_shares"]),
len(result["nfs_shares"]),
)
return result