Add color and improved formatting to CLI output

- Auto-detected ANSI colors (disabled when stderr is not a TTY)
- Colored log formatter: dim timestamps, level names styled by severity
- Migration status keywords colored: SKIP=yellow, CREATED=bold-green,
  FAILED=bold-red, [DRY RUN]=cyan
- Share/export header lines bold with share name in bold-cyan
- Dry-run banner rendered as a bold-yellow framed box
- Summary report: cyan border, colored per-stat counts (green/yellow/red),
  errors and warnings highlighted; fixed box width (w=60) with ANSI-aware
  padding so columns stay aligned with color codes present
- Interactive wizard: styled header, cyan numbered list items

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 11:08:12 -05:00
parent e094c05cae
commit 684a21e28f

View File

@@ -56,6 +56,7 @@ import hashlib
import json
import logging
import os
import re as _re
import ssl
import struct
import sys
@@ -64,15 +65,59 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
# ─────────────────────────────────────────────────────────────────────────────
# Color helpers (ANSI; auto-disabled when stderr is not a TTY)
# ─────────────────────────────────────────────────────────────────────────────
_USE_COLOR = sys.stderr.isatty()
def _c(code: str, text: str) -> str:
return f"\033[{code}m{text}\033[0m" if _USE_COLOR else text
def _dim(t: str) -> str: return _c("2", t)
def _bold(t: str) -> str: return _c("1", t)
def _red(t: str) -> str: return _c("31", t)
def _green(t: str) -> str: return _c("32", t)
def _yellow(t: str) -> str: return _c("33", t)
def _cyan(t: str) -> str: return _c("36", t)
def _bold_red(t: str) -> str: return _c("1;31", t)
def _bold_green(t: str) -> str: return _c("1;32", t)
def _bold_yellow(t: str) -> str: return _c("1;33", t)
def _bold_cyan(t: str) -> str: return _c("1;36", t)
def _vis_len(s: str) -> int:
"""Visible character width of a string, ignoring ANSI escape sequences."""
return len(_re.sub(r'\033\[[0-9;]*m', '', s))
# ─────────────────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
class _ColorFormatter(logging.Formatter):
_STYLES = {
logging.DEBUG: "2", # dim
logging.INFO: "36", # cyan
logging.WARNING: "1;33", # bold yellow
logging.ERROR: "1;31", # bold red
logging.CRITICAL: "1;31",
}
def format(self, record: logging.LogRecord) -> str:
ts = self.formatTime(record, self.datefmt)
msg = record.getMessage()
if _USE_COLOR:
code = self._STYLES.get(record.levelno, "0")
level = f"\033[{code}m{record.levelname:<8}\033[0m"
ts = f"\033[2m{ts}\033[0m"
else:
level = f"{record.levelname:<8}"
return f"{ts} {level} {msg}"
_handler = logging.StreamHandler()
_handler.setFormatter(_ColorFormatter(datefmt="%H:%M:%S"))
logging.basicConfig(level=logging.INFO, handlers=[_handler])
log = logging.getLogger("truenas_migrate")
@@ -100,39 +145,65 @@ class Summary:
missing_datasets: list[str] = field(default_factory=list)
def report(self) -> str:
w = 52
hr = "" * w
def row(label: str, val: str) -> str:
right = w - 2 - len(label) - len(val)
return f"{label}{val}{' ' * right}"
w = 60
smb_val = (f"found={self.smb_found} created={self.smb_created}"
f" skipped={self.smb_skipped} failed={self.smb_failed}")
nfs_val = (f"found={self.nfs_found} created={self.nfs_created}"
f" skipped={self.nfs_skipped} failed={self.nfs_failed}")
cfg_val = "applied" if self.cfg_applied else "not applied"
def _stat(label: str, n: int, color_fn) -> str:
s = f"{label}={n}"
return color_fn(s) if n > 0 else _dim(s)
smb_val = (
f"{_dim('found=' + str(self.smb_found))} "
f"{_stat('created', self.smb_created, _bold_green)} "
f"{_stat('skipped', self.smb_skipped, _yellow)} "
f"{_stat('failed', self.smb_failed, _bold_red)}"
)
nfs_val = (
f"{_dim('found=' + str(self.nfs_found))} "
f"{_stat('created', self.nfs_created, _bold_green)} "
f"{_stat('skipped', self.nfs_skipped, _yellow)} "
f"{_stat('failed', self.nfs_failed, _bold_red)}"
)
cfg_val = _bold_green("applied") if self.cfg_applied else _dim("not applied")
hr = _cyan("" * w)
tl = _cyan(""); tr = _cyan("")
ml = _cyan(""); mr = _cyan("")
bl = _cyan(""); br = _cyan("")
side = _cyan("")
title_text = "MIGRATION SUMMARY"
lpad = (w - len(title_text)) // 2
rpad = w - len(title_text) - lpad
title_row = f"{side}{' ' * lpad}{_bold(title_text)}{' ' * rpad}{side}"
def row(label: str, val: str) -> str:
right = max(0, w - 2 - len(label) - _vis_len(val))
return f"{side} {_dim(label)}{val}{' ' * right} {side}"
lines = [
"",
f"{hr}",
f"{'MIGRATION SUMMARY':^{w}}",
f"{hr}",
row(" SMB shares : ", smb_val),
row(" NFS shares : ", nfs_val),
row(" SMB config : ", cfg_val),
f"{hr}",
f"{tl}{hr}{tr}",
title_row,
f"{ml}{hr}{mr}",
row("SMB shares : ", smb_val),
row("NFS shares : ", nfs_val),
row("SMB config : ", cfg_val),
f"{bl}{hr}{br}",
]
if self.errors:
lines.append(f"\n {len(self.errors)} error(s):")
lines.append(f"\n {_bold_red(str(len(self.errors)) + ' error(s):')} ")
for e in self.errors:
lines.append(f" {e}")
lines.append(f" {_red('')} {e}")
if self.missing_datasets:
lines.append(
f"\n WARNING: {len(self.missing_datasets)} share path(s) have no "
f"\n {_bold_yellow('WARNING:')} "
f"{len(self.missing_datasets)} share path(s) have no "
"matching dataset on the destination:"
)
for p in self.missing_datasets:
lines.append(f" {p}")
lines.append(f" {_yellow('')} {p}")
lines.append(
" These paths must exist before shares can be created.\n"
" Use interactive mode or answer 'y' at the dataset prompt to create them."
@@ -917,10 +988,10 @@ async def migrate_smb_shares(
for share in shares:
name = share.get("name", "<unnamed>")
log.info("── SMB share %r", name)
log.info("%s SMB share %s", _bold("──"), _bold_cyan(repr(name)))
if name.lower() in existing_names:
log.info(" SKIP already exists on destination.")
log.info(" %s already exists on destination.", _yellow("SKIP"))
summary.smb_skipped += 1
continue
@@ -928,8 +999,8 @@ async def migrate_smb_shares(
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" [DRY RUN] would create SMB share %r%s",
name, payload.get("path"))
log.info(" %s would create %s%s",
_cyan("[DRY RUN]"), _bold_cyan(repr(name)), payload.get("path"))
summary.smb_created += 1
if payload.get("path"):
summary.paths_to_create.append(payload["path"])
@@ -937,10 +1008,10 @@ async def migrate_smb_shares(
try:
r = await client.call("sharing.smb.create", [payload])
log.info(" CREATED id=%s", r.get("id"))
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.smb_created += 1
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.smb_failed += 1
summary.errors.append(f"SMB share {name!r}: {exc}")
@@ -973,10 +1044,10 @@ async def migrate_nfs_shares(
core_paths = share.get("paths") or []
path = (share.get("path") or (core_paths[0] if core_paths else "")).rstrip("/")
all_paths = [p.rstrip("/") for p in (core_paths if core_paths else ([path] if path else []))]
log.info("── NFS export %r", path)
log.info("%s NFS export %s", _bold("──"), _bold_cyan(repr(path)))
if path in existing_paths:
log.info(" SKIP path already exported on destination.")
log.info(" %s path already exported on destination.", _yellow("SKIP"))
summary.nfs_skipped += 1
continue
@@ -984,17 +1055,18 @@ async def migrate_nfs_shares(
log.debug(" payload: %s", json.dumps(payload))
if dry_run:
log.info(" [DRY RUN] would create NFS export for %r", path)
log.info(" %s would create NFS export for %s",
_cyan("[DRY RUN]"), _bold_cyan(repr(path)))
summary.nfs_created += 1
summary.paths_to_create.extend(all_paths)
continue
try:
r = await client.call("sharing.nfs.create", [payload])
log.info(" CREATED id=%s", r.get("id"))
log.info(" %s id=%s", _bold_green("CREATED"), r.get("id"))
summary.nfs_created += 1
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.nfs_failed += 1
summary.errors.append(f"NFS share {path!r}: {exc}")
@@ -1010,7 +1082,7 @@ async def migrate_smb_config(
return
payload = _smb_config_payload(config)
log.info("── SMB global config")
log.info("%s SMB global config", _bold("──"))
log.info(
" netbiosname=%-20s workgroup=%-15s encryption=%s",
repr(payload.get("netbiosname")),
@@ -1019,16 +1091,16 @@ async def migrate_smb_config(
)
if dry_run:
log.info(" [DRY RUN] would call smb.update")
log.info(" %s would call smb.update", _cyan("[DRY RUN]"))
summary.cfg_applied = True
return
try:
await client.call("smb.update", [payload])
log.info(" APPLIED")
log.info(" %s", _bold_green("APPLIED"))
summary.cfg_applied = True
except RuntimeError as exc:
log.error(" FAILED: %s", exc)
log.error(" %s: %s", _bold_red("FAILED"), exc)
summary.errors.append(f"SMB config: {exc}")
@@ -1045,9 +1117,11 @@ async def run(
migrate_set = set(args.migrate)
if args.dry_run:
log.info("=" * 55)
log.info("DRY RUN no changes will be made on the destination")
log.info("=" * 55)
msg = " DRY RUN no changes will be made on the destination "
bar = _bold_yellow("" * len(msg))
print(f"\n{_bold_yellow('')}{bar}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{_bold_yellow(msg)}{_bold_yellow('')}", file=sys.stderr)
print(f"{_bold_yellow('')}{bar}{_bold_yellow('')}\n", file=sys.stderr)
summary = Summary()
@@ -1113,7 +1187,10 @@ def _confirm(label: str) -> bool:
def interactive_mode() -> None:
"""Interactive wizard: pick archive → configure → dry run → confirm → apply."""
print("\n=== TrueNAS Share Migration Tool ===\n")
print(
f"\n{_bold_cyan(' TrueNAS Share Migration Tool')}\n"
f" {_dim('Migrate SMB/NFS shares from a debug archive to a live system.')}\n"
)
# 1 ── Locate debug archive ────────────────────────────────────────────────
archives = _find_debug_archives()
@@ -1126,11 +1203,11 @@ def interactive_mode() -> None:
if len(archives) == 1:
chosen = archives[0]
print(f"Archive: {chosen.name} ({chosen.stat().st_size / 1_048_576:.1f} MB)\n")
print(f" {_dim('Archive:')} {_bold(chosen.name)} {_dim('(' + f'{chosen.stat().st_size / 1_048_576:.1f} MB' + ')')}\n")
else:
print("Debug archives found:\n")
print(f" {_bold('Debug archives found:')}\n")
for i, p in enumerate(archives, 1):
print(f" {i}. {p.name} ({p.stat().st_size / 1_048_576:.1f} MB)")
print(f" {_cyan(str(i) + '.')} {p.name} {_dim('(' + f'{p.stat().st_size / 1_048_576:.1f} MB' + ')')}")
print()
while True:
raw = _prompt(f"Select archive [1-{len(archives)}]")
@@ -1162,10 +1239,10 @@ def interactive_mode() -> None:
print(" API key is required.")
# 4 ── Migration scope ─────────────────────────────────────────────────────
print("\nWhat to migrate?")
print(" 1. SMB shares")
print(" 2. NFS shares")
print(" 3. SMB global config")
print(f"\n {_bold('What to migrate?')}")
print(f" {_cyan('1.')} SMB shares")
print(f" {_cyan('2.')} NFS shares")
print(f" {_cyan('3.')} SMB global config")
sel_raw = _prompt(
"Selection (space-separated numbers, Enter for all)", default="1 2 3"
)