Files
TrueMigration/truenas_migrate/csv_source.py
scott 40daf20809 Redesign CSV templates with human-readable column headers
- Replace API field names (guestok, abe, ro, maproot_user, etc.) with
  plain-English headers (Guest Access, Access-Based Enumeration, Read Only,
  Map Root User, etc.) for customer clarity
- Drop comment rows that rendered poorly in spreadsheet apps
- Use two realistic example rows instead to teach by example
- Update csv_source.py to map friendly header names to API field names
  before validation and coercion (raw API names still accepted)
- Update README column reference to match new header names

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 11:32:25 -05:00

210 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""CSV source parser reads SMB/NFS share definitions from customer-supplied CSV files."""
from __future__ import annotations
import csv
import sys
from pathlib import Path
from typing import Any
from .colors import log
# ─────────────────────────────────────────────────────────────────────────────
# Column name mappings (human-readable header → API field name)
# Both the friendly names and the raw API names are accepted.
# ─────────────────────────────────────────────────────────────────────────────
_SMB_COL_MAP: dict[str, str] = {
"share name": "name",
"path": "path",
"description": "comment",
"purpose": "purpose",
"read only": "ro",
"browsable": "browsable",
"guest access": "guestok",
"access-based enumeration": "abe",
"hosts allow": "hostsallow",
"hosts deny": "hostsdeny",
"time machine": "timemachine",
"enabled": "enabled",
}
_NFS_COL_MAP: dict[str, str] = {
"path": "path",
"description": "comment",
"read only": "ro",
"map root user": "maproot_user",
"map root group": "maproot_group",
"map all user": "mapall_user",
"map all group": "mapall_group",
"security": "security",
"allowed hosts": "hosts",
"allowed networks": "networks",
"enabled": "enabled",
}
# ─────────────────────────────────────────────────────────────────────────────
# Column type metadata (keyed by API field name)
# ─────────────────────────────────────────────────────────────────────────────
# Columns coerced to bool
_SMB_BOOL_COLS = frozenset({"ro", "browsable", "guestok", "abe", "timemachine", "enabled"})
# Columns coerced to list[str] (space-or-comma-separated in CSV)
_SMB_LIST_COLS = frozenset({"hostsallow", "hostsdeny"})
_SMB_REQUIRED = frozenset({"name", "path"})
_NFS_BOOL_COLS = frozenset({"ro", "enabled"})
_NFS_LIST_COLS = frozenset({"security", "hosts", "networks"})
_NFS_REQUIRED = frozenset({"path"})
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _parse_bool(value: str, col: str, row_num: int) -> bool:
v = value.strip().lower()
if v in ("true", "yes", "1"):
return True
if v in ("false", "no", "0", ""):
return False
log.warning(" row %d: unrecognised boolean %r for column %r treating as False",
row_num, value, col)
return False
def _parse_list(value: str) -> list[str]:
"""Split space-or-comma-separated value into a list, dropping blanks."""
return [p for p in value.replace(",", " ").split() if p]
def _coerce_row(
row: dict[str, str],
bool_cols: frozenset[str],
list_cols: frozenset[str],
required: frozenset[str],
row_num: int,
) -> dict[str, Any] | None:
"""Validate and type-coerce one CSV row. Returns None to skip the row."""
if not any((v or "").strip() for v in row.values()):
return None # blank row
first_val = next(iter(row.values()), "") or ""
if first_val.strip().startswith("#"):
return None # comment row
result: dict[str, Any] = {}
for col, raw in row.items():
if col is None:
continue
col = col.strip()
val = (raw or "").strip()
if not val:
continue # omit empty optional fields; API uses its defaults
if col in bool_cols:
result[col] = _parse_bool(val, col, row_num)
elif col in list_cols:
result[col] = _parse_list(val)
else:
result[col] = val
for req in required:
if req not in result:
log.warning(" row %d: missing required field %r skipping row", row_num, req)
return None
return result
def _normalize_col(col: str, col_map: dict[str, str]) -> str:
"""Map a header name to its API field name; falls back to the lowercased original."""
key = col.strip().lower()
return col_map.get(key, key)
def _parse_csv(
csv_path: str,
bool_cols: frozenset[str],
list_cols: frozenset[str],
required: frozenset[str],
col_map: dict[str, str],
label: str,
) -> list[dict]:
path = Path(csv_path)
if not path.is_file():
log.error("%s CSV file not found: %s", label, csv_path)
sys.exit(1)
shares: list[dict] = []
try:
with path.open(newline="", encoding="utf-8-sig") as fh:
reader = csv.DictReader(fh)
if reader.fieldnames is None:
log.error("%s CSV has no header row: %s", label, csv_path)
sys.exit(1)
# Normalise header names using the column map
normalised_header = {
_normalize_col(c, col_map)
for c in reader.fieldnames if c is not None
}
missing_req = required - normalised_header
if missing_req:
log.error(
"%s CSV is missing required column(s): %s",
label, ", ".join(sorted(missing_req)),
)
sys.exit(1)
for row_num, row in enumerate(reader, start=2):
normalised = {
_normalize_col(k, col_map): v
for k, v in row.items() if k is not None
}
share = _coerce_row(normalised, bool_cols, list_cols, required, row_num)
if share is not None:
shares.append(share)
except OSError as exc:
log.error("Cannot read %s CSV: %s", label, exc)
sys.exit(1)
log.info(" %-14s%s (%d share(s))", label.lower() + "_shares", csv_path, len(shares))
return shares
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def parse_smb_csv(csv_path: str) -> list[dict]:
"""Parse an SMB shares CSV. Returns share dicts compatible with migrate.py."""
return _parse_csv(csv_path, _SMB_BOOL_COLS, _SMB_LIST_COLS, _SMB_REQUIRED, _SMB_COL_MAP, "SMB")
def parse_nfs_csv(csv_path: str) -> list[dict]:
"""Parse an NFS shares CSV. Returns share dicts compatible with migrate.py."""
return _parse_csv(csv_path, _NFS_BOOL_COLS, _NFS_LIST_COLS, _NFS_REQUIRED, _NFS_COL_MAP, "NFS")
def parse_csv_sources(smb_csv: str | None, nfs_csv: str | None) -> dict[str, Any]:
"""
Parse one or both CSV files.
Returns {"smb_shares": list, "nfs_shares": list} — same shape as parse_archive().
"""
log.info("Loading shares from CSV source(s).")
result: dict[str, Any] = {"smb_shares": [], "nfs_shares": []}
if smb_csv:
result["smb_shares"] = parse_smb_csv(smb_csv)
if nfs_csv:
result["nfs_shares"] = parse_nfs_csv(nfs_csv)
log.info(
"Loaded: %d SMB share(s), %d NFS share(s)",
len(result["smb_shares"]),
len(result["nfs_shares"]),
)
return result