"""CSV source parser – reads SMB/NFS share definitions from customer-supplied CSV files.""" from __future__ import annotations import csv import sys from pathlib import Path from typing import Any from .colors import log # ───────────────────────────────────────────────────────────────────────────── # Column type metadata # ───────────────────────────────────────────────────────────────────────────── # Columns coerced to bool _SMB_BOOL_COLS = frozenset({"ro", "browsable", "guestok", "abe", "timemachine", "enabled"}) # Columns coerced to list[str] (space-or-comma-separated in CSV) _SMB_LIST_COLS = frozenset({"hostsallow", "hostsdeny"}) _SMB_REQUIRED = frozenset({"name", "path"}) _NFS_BOOL_COLS = frozenset({"ro", "enabled"}) _NFS_LIST_COLS = frozenset({"security", "hosts", "networks"}) _NFS_REQUIRED = frozenset({"path"}) # ───────────────────────────────────────────────────────────────────────────── # Internal helpers # ───────────────────────────────────────────────────────────────────────────── def _parse_bool(value: str, col: str, row_num: int) -> bool: v = value.strip().lower() if v in ("true", "yes", "1"): return True if v in ("false", "no", "0", ""): return False log.warning(" row %d: unrecognised boolean %r for column %r – treating as False", row_num, value, col) return False def _parse_list(value: str) -> list[str]: """Split space-or-comma-separated value into a list, dropping blanks.""" return [p for p in value.replace(",", " ").split() if p] def _coerce_row( row: dict[str, str], bool_cols: frozenset[str], list_cols: frozenset[str], required: frozenset[str], row_num: int, ) -> dict[str, Any] | None: """Validate and type-coerce one CSV row. Returns None to skip the row.""" if not any((v or "").strip() for v in row.values()): return None # blank row first_val = next(iter(row.values()), "") or "" if first_val.strip().startswith("#"): return None # comment row result: dict[str, Any] = {} for col, raw in row.items(): if col is None: continue col = col.strip() val = (raw or "").strip() if not val: continue # omit empty optional fields; API uses its defaults if col in bool_cols: result[col] = _parse_bool(val, col, row_num) elif col in list_cols: result[col] = _parse_list(val) else: result[col] = val for req in required: if req not in result: log.warning(" row %d: missing required field %r – skipping row", row_num, req) return None return result def _parse_csv( csv_path: str, bool_cols: frozenset[str], list_cols: frozenset[str], required: frozenset[str], label: str, ) -> list[dict]: path = Path(csv_path) if not path.is_file(): log.error("%s CSV file not found: %s", label, csv_path) sys.exit(1) shares: list[dict] = [] try: with path.open(newline="", encoding="utf-8-sig") as fh: reader = csv.DictReader(fh) if reader.fieldnames is None: log.error("%s CSV has no header row: %s", label, csv_path) sys.exit(1) header = {c.strip() for c in reader.fieldnames if c is not None} missing_req = required - header if missing_req: log.error( "%s CSV is missing required column(s): %s", label, ", ".join(sorted(missing_req)), ) sys.exit(1) for row_num, row in enumerate(reader, start=2): normalised = {(k or "").strip(): v for k, v in row.items()} share = _coerce_row(normalised, bool_cols, list_cols, required, row_num) if share is not None: shares.append(share) except OSError as exc: log.error("Cannot read %s CSV: %s", label, exc) sys.exit(1) log.info(" %-14s → %s (%d share(s))", label.lower() + "_shares", csv_path, len(shares)) return shares # ───────────────────────────────────────────────────────────────────────────── # Public API # ───────────────────────────────────────────────────────────────────────────── def parse_smb_csv(csv_path: str) -> list[dict]: """Parse an SMB shares CSV. Returns share dicts compatible with migrate.py.""" return _parse_csv(csv_path, _SMB_BOOL_COLS, _SMB_LIST_COLS, _SMB_REQUIRED, "SMB") def parse_nfs_csv(csv_path: str) -> list[dict]: """Parse an NFS shares CSV. Returns share dicts compatible with migrate.py.""" return _parse_csv(csv_path, _NFS_BOOL_COLS, _NFS_LIST_COLS, _NFS_REQUIRED, "NFS") def parse_csv_sources(smb_csv: str | None, nfs_csv: str | None) -> dict[str, Any]: """ Parse one or both CSV files. Returns {"smb_shares": list, "nfs_shares": list} — same shape as parse_archive(). """ log.info("Loading shares from CSV source(s).") result: dict[str, Any] = {"smb_shares": [], "nfs_shares": []} if smb_csv: result["smb_shares"] = parse_smb_csv(smb_csv) if nfs_csv: result["nfs_shares"] = parse_nfs_csv(nfs_csv) log.info( "Loaded: %d SMB share(s), %d NFS share(s)", len(result["smb_shares"]), len(result["nfs_shares"]), ) return result