diff --git a/utils/sas_profiler.py b/utils/sas_profiler.py new file mode 100644 index 0000000..7cbab2c --- /dev/null +++ b/utils/sas_profiler.py @@ -0,0 +1,1125 @@ +"""Standalone utility that profiles a single local SAS file and writes an +Excel report with drop, partition, and index candidates plus type-inference +warnings. + +Configure the constants below and run:: + + python3 utils/sas_profiler.py + +Or override any of them from the command line:: + + python3 utils/sas_profiler.py \ + --file ./data/mystate.sas7bdat \ + --out ./reports/mystate_profile.xlsx + +The report is a paste-ready companion to +``generic_loader/load_sas.py`` and ``generic_loader/load_folder.py``: the +"inferred Postgres type" column uses the loader's own ``infer_schema`` so the +drop / partition / index suggestions map one-to-one onto valid YAML config +entries for those scripts. + +Supported inputs: ``.sas7bdat`` / ``.xpt`` / ``.xport`` (whatever the loader +can read). + +Python 3.10+ compatible. +""" + +from __future__ import annotations + +import argparse +import collections +import datetime as dt +import math +import os +import re +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +# The loader lives in a sibling directory that is *not* a proper package +# (no __init__.py). Its own modules import each other by bare name, so we +# add the directory to sys.path before importing it here. +_REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_REPO_ROOT / "generic_loader")) + +import pandas as pd # noqa: E402 +from openpyxl import Workbook # noqa: E402 +from openpyxl.styles import Alignment, Font, PatternFill # noqa: E402 +from openpyxl.utils import get_column_letter # noqa: E402 + +from load_sas import ( # noqa: E402 + NUMERIC_INT_RANGE, + ColumnSpec, + _char_missing_mask, + infer_schema, + iter_sas_chunks, + read_sas_preview, +) + + +# --------------------------------------------------------------------------- +# Configuration - edit these before running, or override via CLI flags +# --------------------------------------------------------------------------- + +SAS_PATH: str = "./generic_loader/samples/sample_kitchensink.xpt" +"""Local path to the .sas7bdat / .xpt / .xport file to profile.""" + +OUTPUT_XLSX: str = "./sas_profile.xlsx" +"""Where to write the Excel report.""" + +HIGH_NULL_PCT: float = 95.0 +"""Columns whose null percentage meets or exceeds this threshold are flagged +as drop candidates.""" + +INDEX_UNIQUENESS_PCT: float = 95.0 +"""Columns whose distinct/non-null ratio meets or exceeds this threshold are +flagged as index candidates.""" + +PARTITION_MIN_DISTINCT: int = 2 +"""A partition candidate must have at least this many distinct values.""" + +PARTITION_MAX_DISTINCT: int = 500 +"""A partition candidate must have at most this many distinct values. Kept +deliberately tighter than the loader's max_partitions default (10,000) so +the default suggestions stay conservative.""" + +PARTITION_MIN_FILL_PCT: float = 95.0 +"""Partition candidates must be non-null in at least this fraction of rows.""" + +PRE_SHARDED_MAX_DISTINCT: int = 3 +"""A name-matched column with <= this many distinct values is treated as +"the file is probably pre-sharded on this column" rather than being +silently dumped into the drop list.""" + +DISTINCT_CAP: int = 10_000 +"""Max size of the per-column distinct-value set. Exceeding this marks the +column as ``distinct_overflow`` and we report ">= CAP" in the xlsx.""" + +TOP_N_VALUES: int = 5 +"""Number of most-frequent values tracked per column.""" + +PREVIEW_ROWS_FOR_INFERENCE: int = 10_000 +"""Rows pulled from the file for the loader's schema inference. Matches +``load_sas.TYPE_INFERENCE_SAMPLE_ROWS`` so suggestions track the loader.""" + + +PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( + re.compile(r"^state$", re.IGNORECASE), + re.compile(r"^state_?code$", re.IGNORECASE), +) +"""Column names that are "probably partition columns" regardless of how +many distinct values happen to be present in this one file. Kept tiny on +purpose - add more patterns here later if you want to recognise +region/year/etc.""" + + +INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( + re.compile(r"^id$", re.IGNORECASE), + re.compile(r"_id$", re.IGNORECASE), + re.compile(r"_key$", re.IGNORECASE), + re.compile(r"^pk_", re.IGNORECASE), +) +"""Name-bonus patterns for index-candidate ranking.""" + + +_PARTITION_FRIENDLY_TYPES: frozenset = frozenset( + {"TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER", + "INTEGER", "BIGINT", "SMALLINT", "BOOLEAN", "DATE"} +) + + +# --------------------------------------------------------------------------- +# Per-column streaming aggregator +# --------------------------------------------------------------------------- + + +@dataclass +class _ColumnStats: + """Accumulators updated chunk-by-chunk while streaming the file.""" + + name: str + n_total: int = 0 + n_null: int = 0 + n_empty_str: int = 0 + + distinct: set = field(default_factory=set) + distinct_overflow: bool = False + + top_counts: "collections.Counter[Any]" = field(default_factory=collections.Counter) + + min_val: Any = None + max_val: Any = None + + # Numeric running stats (Welford would be nicer but sum/sum-sq is plenty + # here for a "help me pick columns" report). + numeric_sum: float = 0.0 + numeric_sumsq: float = 0.0 + numeric_count: int = 0 + + # String byte-length stats (helps flag oversized TEXT columns). + str_max_bytes: int = 0 + str_sum_bytes: int = 0 + str_count: int = 0 + + samples: List[Any] = field(default_factory=list) + + def update(self, series: pd.Series) -> None: + """Fold one chunk's worth of this column into the accumulator.""" + self.n_total += len(series) + if len(series) == 0: + return + + if pd.api.types.is_object_dtype(series): + miss_mask = _char_missing_mask(series) + else: + miss_mask = series.isna() + + miss_count = int(miss_mask.sum()) + self.n_null += miss_count + + non_null = series[~miss_mask] + + if pd.api.types.is_object_dtype(series): + # Empty-string tracking is useful for TEXT columns where the loader + # later translates "" -> NULL in the COPY step. A column dominated + # by empty strings is still effectively null even if it isn't NaN. + empty_mask = series.map(lambda v: isinstance(v, str) and v == "") + self.n_empty_str += int(empty_mask.sum()) + + if pd.api.types.is_numeric_dtype(series) and not non_null.empty: + as_float = non_null.astype("float64") + self.numeric_sum += float(as_float.sum()) + self.numeric_sumsq += float((as_float * as_float).sum()) + self.numeric_count += int(len(as_float)) + cmin = as_float.min() + cmax = as_float.max() + if self.min_val is None or cmin < self.min_val: + self.min_val = cmin + if self.max_val is None or cmax > self.max_val: + self.max_val = cmax + + elif pd.api.types.is_datetime64_any_dtype(series) and not non_null.empty: + cmin = non_null.min() + cmax = non_null.max() + if self.min_val is None or cmin < self.min_val: + self.min_val = cmin + if self.max_val is None or cmax > self.max_val: + self.max_val = cmax + + if pd.api.types.is_object_dtype(series) and not non_null.empty: + str_like = non_null.map(lambda v: v if isinstance(v, str) else str(v)) + byte_lens = str_like.map(lambda s: len(s.encode("utf-8", errors="replace"))) + if len(byte_lens): + bmax = int(byte_lens.max()) + if bmax > self.str_max_bytes: + self.str_max_bytes = bmax + self.str_sum_bytes += int(byte_lens.sum()) + self.str_count += int(len(byte_lens)) + + for val in non_null.tolist(): + hashable = _hashable(val) + if hashable is _UNHASHABLE: + # Give up on distinct/top-counts for this column; it's some + # exotic (e.g. list) value we can't hash, and the drop/index + # suggestions wouldn't be meaningful anyway. + self.distinct_overflow = True + continue + if not self.distinct_overflow: + if hashable in self.distinct: + pass + elif len(self.distinct) >= DISTINCT_CAP: + self.distinct_overflow = True + else: + self.distinct.add(hashable) + if len(self.top_counts) < DISTINCT_CAP or hashable in self.top_counts: + self.top_counts[hashable] += 1 + + if len(self.samples) < 3: + self.samples.append(val) + + # -- Derived properties ------------------------------------------------ + + @property + def n_non_null(self) -> int: + return self.n_total - self.n_null + + @property + def null_pct(self) -> float: + if self.n_total == 0: + return 0.0 + return 100.0 * self.n_null / self.n_total + + @property + def fill_pct(self) -> float: + return 100.0 - self.null_pct + + @property + def distinct_count(self) -> int: + return len(self.distinct) + + @property + def distinct_display(self) -> str: + if self.distinct_overflow: + return f">= {DISTINCT_CAP:,}" + return f"{self.distinct_count:,}" + + @property + def mean(self) -> Optional[float]: + if self.numeric_count == 0: + return None + return self.numeric_sum / self.numeric_count + + @property + def std(self) -> Optional[float]: + if self.numeric_count < 2: + return None + mean = self.mean + var = self.numeric_sumsq / self.numeric_count - (mean * mean) + # Guard against tiny negative from floating point noise. + if var < 0: + var = 0.0 + return math.sqrt(var) + + @property + def top_value(self) -> Tuple[Any, int]: + if not self.top_counts: + return (None, 0) + return self.top_counts.most_common(1)[0] + + def top_values(self, n: int = TOP_N_VALUES) -> List[Tuple[Any, int]]: + return self.top_counts.most_common(n) + + +class _UnhashableSentinel: + pass + + +_UNHASHABLE = _UnhashableSentinel() + + +def _hashable(val: Any) -> Any: + """Return a hashable form of ``val``, or :data:`_UNHASHABLE` if we can't. + + pandas occasionally hands us objects (lists, dicts) from object columns + that aren't hashable. Rather than crashing the whole report, we let the + column fall back to "distinct_overflow" mode for those rows. + """ + try: + hash(val) + return val + except TypeError: + return _UNHASHABLE + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _matches_any(patterns: Tuple[re.Pattern, ...], name: str) -> bool: + return any(p.search(name) for p in patterns) + + +def _format_size(n_bytes: int) -> str: + size = float(n_bytes) + for unit in ("B", "KB", "MB", "GB", "TB"): + if size < 1024.0 or unit == "TB": + return f"{size:,.1f} {unit}" + size /= 1024.0 + return f"{size:,.1f} TB" + + +def _format_value(val: Any) -> str: + """Render a single Python value for display in the spreadsheet.""" + if val is None: + return "" + if isinstance(val, float) and pd.isna(val): + return "" + if isinstance(val, (pd.Timestamp, dt.date, dt.datetime)): + return str(val) + return repr(val) if isinstance(val, str) else str(val) + + +def _format_top_values(pairs: List[Tuple[Any, int]]) -> str: + if not pairs: + return "" + return ", ".join(f"{_format_value(v)} ({c:,})" for v, c in pairs) + + +def _format_samples(samples: List[Any]) -> str: + if not samples: + return "(all null)" + return ", ".join(_format_value(v) for v in samples) + + +# --------------------------------------------------------------------------- +# Streaming profile +# --------------------------------------------------------------------------- + + +def profile_file( + path: Path, + *, + chunksize: Optional[int] = None, +) -> Tuple[Dict[str, _ColumnStats], Dict[str, ColumnSpec], Any, int]: + """Stream ``path`` once, returning (stats, columns, meta, total_rows). + + ``columns`` is the loader's inferred schema from the first + ``PREVIEW_ROWS_FOR_INFERENCE`` rows - identical to what ``load_sas`` + would use. ``stats`` are the full-file observations we add on top. + """ + preview_df, meta = read_sas_preview(path, rows=PREVIEW_ROWS_FOR_INFERENCE) + total_rows_hint = getattr(meta, "number_rows", None) + columns = infer_schema(preview_df, meta, total_rows=total_rows_hint) + + stats: Dict[str, _ColumnStats] = { + name: _ColumnStats(name=name) for name in columns + } + + total_rows = 0 + kwargs = {} + if chunksize is not None: + kwargs["chunksize"] = chunksize + for chunk_df, _chunk_meta in iter_sas_chunks(path, **kwargs): + total_rows += len(chunk_df) + print(f" profiling... {total_rows:,} rows", file=sys.stderr) + for name, cs in stats.items(): + if name not in chunk_df.columns: + continue + cs.update(chunk_df[name]) + + return stats, columns, meta, total_rows + + +# --------------------------------------------------------------------------- +# Classifiers +# --------------------------------------------------------------------------- + + +@dataclass +class _DropCandidate: + name: str + reason: str + + +@dataclass +class _PartitionCandidate: + name: str + kind: str # "observed" or "pre_sharded" + distinct_count: int + fill_pct: float + top_values: str + observed_values_in_file: str + note: str + score: float + + +@dataclass +class _IndexCandidate: + name: str + uniqueness_pct: float + distinct_count: int + fill_pct: float + name_bonus: bool + note: str + score: float + + +@dataclass +class _TypeWarning: + name: str + severity: str # "info" | "warn" | "error" + message: str + + +def _is_constant_like(cs: _ColumnStats) -> bool: + """True when the column is effectively a single value (possibly with + a handful of nulls / empties mixed in).""" + if cs.n_non_null == 0: + return False + return cs.distinct_count == 1 and not cs.distinct_overflow + + +def classify( + stats: Dict[str, _ColumnStats], + columns: Dict[str, ColumnSpec], + *, + high_null_pct: float, + index_uniqueness_pct: float, + partition_min_distinct: int, + partition_max_distinct: int, + partition_min_fill_pct: float, + pre_sharded_max_distinct: int, +) -> Tuple[ + List[_DropCandidate], + List[_PartitionCandidate], + List[_IndexCandidate], + List[_TypeWarning], +]: + """Turn per-column stats + the loader's schema into four ranked lists.""" + + drops: List[_DropCandidate] = [] + partitions: List[_PartitionCandidate] = [] + indexes: List[_IndexCandidate] = [] + warnings: List[_TypeWarning] = [] + + # Names we've already routed into the partition lane - exclude them from + # the drop / index lanes downstream. + claimed_by_partition: set = set() + + # -- First pass: partition-name-matched columns ------------------------ + # Run this before the drop check so pre-sharded STATE columns don't get + # silently dropped. + for name, cs in stats.items(): + spec = columns.get(name) + if not _matches_any(PARTITION_NAME_PATTERNS, name): + continue + if cs.n_total == 0: + continue + + looks_pre_sharded = ( + cs.n_non_null > 0 + and not cs.distinct_overflow + and cs.distinct_count <= pre_sharded_max_distinct + and cs.fill_pct >= partition_min_fill_pct + ) + if looks_pre_sharded: + observed = ", ".join(_format_value(v) for v, _ in cs.top_values(pre_sharded_max_distinct)) + note_parts = [ + f"pre-sharded: this file only contains {cs.distinct_count} distinct " + f"value(s) ({observed})", + "keep the column and set partition_by at the load_folder level so " + "sibling files merge into separate partitions of one table", + ] + partitions.append( + _PartitionCandidate( + name=name, + kind="pre_sharded", + distinct_count=cs.distinct_count, + fill_pct=cs.fill_pct, + top_values=_format_top_values(cs.top_values()), + observed_values_in_file=observed, + note="; ".join(note_parts), + # Pre-sharded STATE always wins the ranking. + score=1_000_000.0, + ) + ) + claimed_by_partition.add(name) + continue + + # Name-matched but not pre-sharded: fall through into the regular + # partition candidate pass below, which will score it up due to the + # name match. + + # -- Drop candidates --------------------------------------------------- + for name, cs in stats.items(): + if name in claimed_by_partition: + continue + if cs.n_total == 0: + continue + + reason: Optional[str] = None + if cs.n_null == cs.n_total: + reason = "all-null" + elif ( + cs.n_non_null > 0 + and cs.distinct_count == 0 + and not cs.distinct_overflow + ): + # Non-null but nothing hashable captured - treat as opaque. + reason = "all-empty / unhashable" + elif cs.n_non_null == cs.n_empty_str and cs.n_empty_str > 0: + reason = "all-empty" + elif _is_constant_like(cs): + only_val = next(iter(cs.distinct)) + reason = f"constant={_format_value(only_val)}" + elif cs.null_pct >= high_null_pct: + reason = f"null_pct={cs.null_pct:.1f}%" + + if reason is not None: + drops.append(_DropCandidate(name=name, reason=reason)) + + dropped_names = {d.name for d in drops} + + # -- Partition candidates (observed) ---------------------------------- + for name, cs in stats.items(): + if name in claimed_by_partition or name in dropped_names: + continue + spec = columns.get(name) + if spec is None: + continue + pg_type = spec.postgres_type.upper() + if pg_type not in _PARTITION_FRIENDLY_TYPES: + continue + if cs.distinct_overflow: + continue + if not ( + partition_min_distinct <= cs.distinct_count <= partition_max_distinct + ): + continue + if cs.fill_pct < partition_min_fill_pct: + continue + + name_match = _matches_any(PARTITION_NAME_PATTERNS, name) + # Score: name-match dominates, then prefer fewer partitions (safer + # DDL), then prefer more-filled columns as a tiebreaker. + score = ( + (500_000.0 if name_match else 0.0) + + (partition_max_distinct - cs.distinct_count) + + cs.fill_pct + ) + + notes: List[str] = [] + if name_match: + notes.append("name matches PARTITION_NAME_PATTERNS") + if cs.distinct_count > 10_000: + notes.append( + f"distinct_count={cs.distinct_count:,} exceeds loader " + "max_partitions default (10,000); expect DDL warnings" + ) + notes.append( + "LIST partitioning creates one child table per distinct value " + "(see load_sas.render_partition_ddl)" + ) + + partitions.append( + _PartitionCandidate( + name=name, + kind="observed", + distinct_count=cs.distinct_count, + fill_pct=cs.fill_pct, + top_values=_format_top_values(cs.top_values()), + observed_values_in_file=_format_top_values(cs.top_values()), + note="; ".join(notes), + score=score, + ) + ) + + partitions.sort(key=lambda p: p.score, reverse=True) + partition_names = {p.name for p in partitions} + + # -- Index candidates -------------------------------------------------- + for name, cs in stats.items(): + if name in dropped_names or name in partition_names: + continue + spec = columns.get(name) + if spec is None: + continue + if cs.n_non_null == 0: + continue + if cs.distinct_overflow: + # Super-high-cardinality → perfect candidate for an index. + uniqueness = 100.0 + distinct_count = DISTINCT_CAP # display sentinel + else: + uniqueness = 100.0 * cs.distinct_count / cs.n_non_null + distinct_count = cs.distinct_count + if uniqueness < index_uniqueness_pct: + continue + + name_bonus = _matches_any(INDEX_NAME_PATTERNS, name) + notes: List[str] = [] + if name_bonus: + notes.append("name matches INDEX_NAME_PATTERNS (ID/KEY-ish)") + if cs.distinct_overflow: + notes.append( + f"distinct tracking capped at {DISTINCT_CAP:,}; " + "treating as high-cardinality" + ) + + # Rank: name match dominates, then raw uniqueness, then fill. + score = (500_000.0 if name_bonus else 0.0) + uniqueness + cs.fill_pct / 100.0 + + indexes.append( + _IndexCandidate( + name=name, + uniqueness_pct=uniqueness, + distinct_count=distinct_count, + fill_pct=cs.fill_pct, + name_bonus=name_bonus, + note="; ".join(notes), + score=score, + ) + ) + + indexes.sort(key=lambda i: i.score, reverse=True) + + # -- Type warnings ----------------------------------------------------- + for name, cs in stats.items(): + spec = columns.get(name) + if spec is None: + continue + + # Re-surface whatever the loader's own inference already flagged in + # notes - these are genuinely useful for the user to see without + # having to dry-run the loader. + for note in spec.notes: + warnings.append( + _TypeWarning(name=name, severity="info", message=note) + ) + if spec.sampled: + warnings.append( + _TypeWarning( + name=name, + severity="info", + message=( + "loader inferred type from a bounded preview; " + "sampled=True" + ), + ) + ) + + pg_type = spec.postgres_type.upper() + + # Preview said NOT NULL but the full file has nulls - loader would + # have emitted NOT NULL and then choked on COPY. + if not spec.nullable and cs.n_null > 0: + warnings.append( + _TypeWarning( + name=name, + severity="error", + message=( + f"preview saw zero nulls (NOT NULL) but full file has " + f"{cs.n_null:,} null(s); COPY would fail under the " + "loader's inferred NOT NULL" + ), + ) + ) + + # INTEGER range check against the full-file observed min/max. + if pg_type == "INTEGER" and cs.numeric_count > 0: + lo, hi = NUMERIC_INT_RANGE + vmin = cs.min_val if cs.min_val is not None else 0 + vmax = cs.max_val if cs.max_val is not None else 0 + try: + if vmin < lo or vmax > hi: + warnings.append( + _TypeWarning( + name=name, + severity="error", + message=( + f"loader inferred INTEGER from the preview but " + f"full-file range [{vmin}, {vmax}] overflows " + f"int4 {NUMERIC_INT_RANGE}; BIGINT required" + ), + ) + ) + except TypeError: + pass + + # Preview said all-null (loader defaults to TEXT) but data exists. + was_all_null_preview = any( + "all-null column" in n for n in spec.notes + ) + if was_all_null_preview and cs.n_non_null > 0: + warnings.append( + _TypeWarning( + name=name, + severity="warn", + message=( + "preview was all-null so loader defaulted to TEXT, " + f"but full file has {cs.n_non_null:,} non-null " + "value(s); consider a tighter include/exclude or " + "re-inferring with TYPE_INFERENCE_SAMPLE_ROWS=None" + ), + ) + ) + + return drops, partitions, indexes, warnings + + +# --------------------------------------------------------------------------- +# YAML snippet +# --------------------------------------------------------------------------- + + +def render_yaml_snippet( + drops: List[_DropCandidate], + partitions: List[_PartitionCandidate], + indexes: List[_IndexCandidate], +) -> str: + """Produce a paste-ready YAML snippet for the loader config.""" + lines: List[str] = ["# Suggested additions to your load_sas.py / load_folder.py config"] + + if drops: + lines.append("exclude:") + for d in drops: + lines.append(f" - {d.name} # {d.reason}") + else: + lines.append("# (no drop candidates found)") + + lines.append("") + + if partitions: + top = partitions[0] + if top.kind == "pre_sharded": + lines.append( + f"# !! PRE-SHARDED: this file only contains " + f"{top.name} = {top.observed_values_in_file}." + ) + lines.append( + "# !! Keep the column in the schema and set partition_by at the " + "load_folder level" + ) + lines.append( + "# !! so sibling files merge into one table under separate " + "partitions." + ) + lines.append("partition_by:") + lines.append(f" - {top.name}") + if len(partitions) > 1: + lines.append( + "# Runners-up (append to partition_by for multi-level " + "LIST partitioning; see load_sas.render_partition_ddl):" + ) + for p in partitions[1:]: + lines.append( + f"# - {p.name} # kind={p.kind} distinct={p.distinct_count}" + ) + else: + lines.append("# (no partition candidates found)") + + lines.append("") + + if indexes: + lines.append("indexes:") + for i in indexes: + bonus = " (name match)" if i.name_bonus else "" + lines.append( + f" - {i.name} # uniqueness={i.uniqueness_pct:.1f}%{bonus}" + ) + else: + lines.append("# (no index candidates found)") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# XLSX writer +# --------------------------------------------------------------------------- + + +_HEADER_FONT = Font(bold=True, color="FFFFFF") +_HEADER_FILL = PatternFill("solid", fgColor="305496") +_WARN_FILL = PatternFill("solid", fgColor="FFE699") +_ERROR_FILL = PatternFill("solid", fgColor="F4B183") + + +def _write_header(ws, headers: List[str]) -> None: + for col_idx, label in enumerate(headers, start=1): + cell = ws.cell(row=1, column=col_idx, value=label) + cell.font = _HEADER_FONT + cell.fill = _HEADER_FILL + cell.alignment = Alignment(vertical="center") + ws.freeze_panes = "A2" + + +def _autosize(ws, *, max_width: int = 60) -> None: + for col_cells in ws.columns: + letter = get_column_letter(col_cells[0].column) + longest = 0 + for cell in col_cells: + if cell.value is None: + continue + text = str(cell.value) + # Only measure the first line so a long YAML cell doesn't push + # everything else ultra-wide. + longest = max(longest, min(len(text.split("\n", 1)[0]), max_width)) + ws.column_dimensions[letter].width = min(max(longest + 2, 10), max_width) + + +def _write_overview( + ws, + *, + path: Path, + size_bytes: int, + total_rows: int, + total_cols: int, + thresholds: Dict[str, Any], +) -> None: + ws.cell(row=1, column=1, value="Field").font = _HEADER_FONT + ws.cell(row=1, column=1).fill = _HEADER_FILL + ws.cell(row=1, column=2, value="Value").font = _HEADER_FONT + ws.cell(row=1, column=2).fill = _HEADER_FILL + ws.freeze_panes = "A2" + + rows = [ + ("File path", str(path)), + ("File size", _format_size(size_bytes)), + ("Extension", path.suffix.lower()), + ("Total rows", f"{total_rows:,}"), + ("Total columns", f"{total_cols:,}"), + ("Generated at", dt.datetime.now().isoformat(timespec="seconds")), + ] + for k, v in thresholds.items(): + rows.append((f"threshold: {k}", str(v))) + + for i, (k, v) in enumerate(rows, start=2): + ws.cell(row=i, column=1, value=k) + ws.cell(row=i, column=2, value=v) + + _autosize(ws) + + +def _write_columns( + ws, + stats: Dict[str, _ColumnStats], + columns: Dict[str, ColumnSpec], +) -> None: + headers = [ + "column", "sas_format", "source_dtype", "inferred_postgres_type", + "nullable", "n_total", "n_null", "null_pct", "distinct_count", + "min", "max", "mean", "std", + "top_value", "top_count", + "max_str_bytes", "mean_str_bytes", + "sample_values", "notes", + ] + _write_header(ws, headers) + + for row_idx, (name, cs) in enumerate(stats.items(), start=2): + spec = columns.get(name) + top_val, top_count = cs.top_value + mean_bytes = (cs.str_sum_bytes / cs.str_count) if cs.str_count else None + values = [ + name, + spec.sas_format if spec else "", + spec.source_dtype if spec else "", + spec.postgres_type if spec else "", + "YES" if (spec and spec.nullable) else "NO", + cs.n_total, + cs.n_null, + round(cs.null_pct, 3), + cs.distinct_display, + _format_value(cs.min_val), + _format_value(cs.max_val), + round(cs.mean, 6) if cs.mean is not None else "", + round(cs.std, 6) if cs.std is not None else "", + _format_value(top_val), + top_count or "", + cs.str_max_bytes or "", + round(mean_bytes, 2) if mean_bytes is not None else "", + _format_samples(cs.samples), + "; ".join(spec.notes) if spec and spec.notes else "", + ] + for col_idx, v in enumerate(values, start=1): + ws.cell(row=row_idx, column=col_idx, value=v) + + _autosize(ws) + + +def _write_drop(ws, drops: List[_DropCandidate]) -> None: + headers = ["column", "reason"] + _write_header(ws, headers) + if not drops: + ws.cell(row=2, column=1, value="(no drop candidates)") + for i, d in enumerate(drops, start=2): + ws.cell(row=i, column=1, value=d.name) + ws.cell(row=i, column=2, value=d.reason) + _autosize(ws) + + +def _write_partition(ws, partitions: List[_PartitionCandidate]) -> None: + headers = [ + "rank", "column", "kind", "distinct_count", "fill_pct", + "observed_values_in_file", "top_values", "score", "note", + ] + _write_header(ws, headers) + if not partitions: + ws.cell(row=2, column=1, value="(no partition candidates)") + for rank, p in enumerate(partitions, start=1): + row = rank + 1 + ws.cell(row=row, column=1, value=rank) + ws.cell(row=row, column=2, value=p.name) + ws.cell(row=row, column=3, value=p.kind) + ws.cell(row=row, column=4, value=p.distinct_count) + ws.cell(row=row, column=5, value=round(p.fill_pct, 3)) + ws.cell(row=row, column=6, value=p.observed_values_in_file) + ws.cell(row=row, column=7, value=p.top_values) + ws.cell(row=row, column=8, value=round(p.score, 3)) + ws.cell(row=row, column=9, value=p.note) + if p.kind == "pre_sharded": + for col in range(1, len(headers) + 1): + ws.cell(row=row, column=col).fill = _WARN_FILL + _autosize(ws) + + +def _write_index(ws, indexes: List[_IndexCandidate]) -> None: + headers = [ + "rank", "column", "uniqueness_pct", "distinct_count", "fill_pct", + "name_bonus", "score", "note", + ] + _write_header(ws, headers) + if not indexes: + ws.cell(row=2, column=1, value="(no index candidates)") + for rank, i in enumerate(indexes, start=1): + row = rank + 1 + ws.cell(row=row, column=1, value=rank) + ws.cell(row=row, column=2, value=i.name) + ws.cell(row=row, column=3, value=round(i.uniqueness_pct, 3)) + ws.cell(row=row, column=4, value=i.distinct_count) + ws.cell(row=row, column=5, value=round(i.fill_pct, 3)) + ws.cell(row=row, column=6, value="YES" if i.name_bonus else "NO") + ws.cell(row=row, column=7, value=round(i.score, 3)) + ws.cell(row=row, column=8, value=i.note) + _autosize(ws) + + +def _write_warnings(ws, warnings: List[_TypeWarning]) -> None: + headers = ["column", "severity", "message"] + _write_header(ws, headers) + if not warnings: + ws.cell(row=2, column=1, value="(no type warnings)") + for i, w in enumerate(warnings, start=2): + ws.cell(row=i, column=1, value=w.name) + ws.cell(row=i, column=2, value=w.severity) + ws.cell(row=i, column=3, value=w.message) + fill = None + if w.severity == "error": + fill = _ERROR_FILL + elif w.severity == "warn": + fill = _WARN_FILL + if fill is not None: + for col in range(1, len(headers) + 1): + ws.cell(row=i, column=col).fill = fill + _autosize(ws) + + +def _write_yaml_sheet(ws, snippet: str) -> None: + ws.cell(row=1, column=1, value="YAML suggestion (paste into your loader config)").font = _HEADER_FONT + ws.cell(row=1, column=1).fill = _HEADER_FILL + cell = ws.cell(row=2, column=1, value=snippet) + cell.alignment = Alignment(wrap_text=True, vertical="top") + # Pick a comfy width for YAML; row height is auto when wrap_text is on. + ws.column_dimensions["A"].width = 100 + + +def write_report( + out_path: Path, + *, + path: Path, + size_bytes: int, + total_rows: int, + stats: Dict[str, _ColumnStats], + columns: Dict[str, ColumnSpec], + drops: List[_DropCandidate], + partitions: List[_PartitionCandidate], + indexes: List[_IndexCandidate], + warnings: List[_TypeWarning], + yaml_snippet: str, + thresholds: Dict[str, Any], +) -> None: + wb = Workbook() + ws = wb.active + ws.title = "Overview" + _write_overview( + ws, + path=path, + size_bytes=size_bytes, + total_rows=total_rows, + total_cols=len(columns), + thresholds=thresholds, + ) + _write_columns(wb.create_sheet("Columns"), stats, columns) + _write_drop(wb.create_sheet("Drop candidates"), drops) + _write_partition(wb.create_sheet("Partition candidates"), partitions) + _write_index(wb.create_sheet("Index candidates"), indexes) + _write_warnings(wb.create_sheet("Type warnings"), warnings) + _write_yaml_sheet(wb.create_sheet("YAML suggestion"), yaml_snippet) + wb.save(out_path) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _build_argparser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description=( + "Profile a local SAS file (.sas7bdat / .xpt / .xport) and write " + "an Excel report with drop, partition_by, and index suggestions " + "for generic_loader/load_sas.py and load_folder.py." + ), + ) + p.add_argument("--file", type=Path, default=Path(SAS_PATH), + help=f"Path to the SAS file to profile (default: {SAS_PATH!r}).") + p.add_argument("--out", type=Path, default=Path(OUTPUT_XLSX), + help=f"Where to write the .xlsx report (default: {OUTPUT_XLSX!r}).") + p.add_argument("--high-null-pct", type=float, default=HIGH_NULL_PCT, + help="Null percentage at/above which a column is a drop candidate.") + p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT, + help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.") + p.add_argument("--partition-min-distinct", type=int, default=PARTITION_MIN_DISTINCT) + p.add_argument("--partition-max-distinct", type=int, default=PARTITION_MAX_DISTINCT) + p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT) + p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT) + return p + + +def main(argv: Optional[List[str]] = None) -> int: + args = _build_argparser().parse_args(argv) + + path: Path = args.file + out_path: Path = args.out + + if not path.exists(): + print(f"error: SAS file not found: {path}", file=sys.stderr) + return 2 + + print(f"profiling {path} -> {out_path}", file=sys.stderr) + stats, columns, meta, total_rows = profile_file(path) + + drops, partitions, indexes, warnings = classify( + stats, columns, + high_null_pct=args.high_null_pct, + index_uniqueness_pct=args.index_uniqueness_pct, + partition_min_distinct=args.partition_min_distinct, + partition_max_distinct=args.partition_max_distinct, + partition_min_fill_pct=args.partition_min_fill_pct, + pre_sharded_max_distinct=args.pre_sharded_max_distinct, + ) + + yaml_snippet = render_yaml_snippet(drops, partitions, indexes) + + thresholds = { + "HIGH_NULL_PCT": args.high_null_pct, + "INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct, + "PARTITION_MIN_DISTINCT": args.partition_min_distinct, + "PARTITION_MAX_DISTINCT": args.partition_max_distinct, + "PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct, + "PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct, + "DISTINCT_CAP": DISTINCT_CAP, + "TOP_N_VALUES": TOP_N_VALUES, + "PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE, + } + + out_path.parent.mkdir(parents=True, exist_ok=True) + write_report( + out_path, + path=path, + size_bytes=os.path.getsize(path), + total_rows=total_rows, + stats=stats, + columns=columns, + drops=drops, + partitions=partitions, + indexes=indexes, + warnings=warnings, + yaml_snippet=yaml_snippet, + thresholds=thresholds, + ) + + print( + f"wrote {out_path} ({len(stats)} columns, {total_rows:,} rows scanned)\n" + f" drops: {len(drops)}\n" + f" partitions: {len(partitions)}\n" + f" indexes: {len(indexes)}\n" + f" warnings: {len(warnings)}", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main())