foxtrot/utils/sas_profiler.py
David Peterson f1af1136dc Add standalone SAS profiling utility
Introduced a new script `sas_profiler.py` that profiles local SAS files and generates an Excel report with recommendations for drops, partitions, and indexes, along with type-inference warnings. The utility supports command-line overrides for configuration and is compatible with Python 3.10+. This addition enhances the existing tools for SAS file management.
2026-04-20 18:38:01 -05:00

1126 lines
38 KiB
Python

"""Standalone utility that profiles a single local SAS file and writes an
Excel report with drop, partition, and index candidates plus type-inference
warnings.
Configure the constants below and run::
python3 utils/sas_profiler.py
Or override any of them from the command line::
python3 utils/sas_profiler.py \
--file ./data/mystate.sas7bdat \
--out ./reports/mystate_profile.xlsx
The report is a paste-ready companion to
``generic_loader/load_sas.py`` and ``generic_loader/load_folder.py``: the
"inferred Postgres type" column uses the loader's own ``infer_schema`` so the
drop / partition / index suggestions map one-to-one onto valid YAML config
entries for those scripts.
Supported inputs: ``.sas7bdat`` / ``.xpt`` / ``.xport`` (whatever the loader
can read).
Python 3.10+ compatible.
"""
from __future__ import annotations
import argparse
import collections
import datetime as dt
import math
import os
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# The loader lives in a sibling directory that is *not* a proper package
# (no __init__.py). Its own modules import each other by bare name, so we
# add the directory to sys.path before importing it here.
_REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO_ROOT / "generic_loader"))
import pandas as pd # noqa: E402
from openpyxl import Workbook # noqa: E402
from openpyxl.styles import Alignment, Font, PatternFill # noqa: E402
from openpyxl.utils import get_column_letter # noqa: E402
from load_sas import ( # noqa: E402
NUMERIC_INT_RANGE,
ColumnSpec,
_char_missing_mask,
infer_schema,
iter_sas_chunks,
read_sas_preview,
)
# ---------------------------------------------------------------------------
# Configuration - edit these before running, or override via CLI flags
# ---------------------------------------------------------------------------
SAS_PATH: str = "./generic_loader/samples/sample_kitchensink.xpt"
"""Local path to the .sas7bdat / .xpt / .xport file to profile."""
OUTPUT_XLSX: str = "./sas_profile.xlsx"
"""Where to write the Excel report."""
HIGH_NULL_PCT: float = 95.0
"""Columns whose null percentage meets or exceeds this threshold are flagged
as drop candidates."""
INDEX_UNIQUENESS_PCT: float = 95.0
"""Columns whose distinct/non-null ratio meets or exceeds this threshold are
flagged as index candidates."""
PARTITION_MIN_DISTINCT: int = 2
"""A partition candidate must have at least this many distinct values."""
PARTITION_MAX_DISTINCT: int = 500
"""A partition candidate must have at most this many distinct values. Kept
deliberately tighter than the loader's max_partitions default (10,000) so
the default suggestions stay conservative."""
PARTITION_MIN_FILL_PCT: float = 95.0
"""Partition candidates must be non-null in at least this fraction of rows."""
PRE_SHARDED_MAX_DISTINCT: int = 3
"""A name-matched column with <= this many distinct values is treated as
"the file is probably pre-sharded on this column" rather than being
silently dumped into the drop list."""
DISTINCT_CAP: int = 10_000
"""Max size of the per-column distinct-value set. Exceeding this marks the
column as ``distinct_overflow`` and we report ">= CAP" in the xlsx."""
TOP_N_VALUES: int = 5
"""Number of most-frequent values tracked per column."""
PREVIEW_ROWS_FOR_INFERENCE: int = 10_000
"""Rows pulled from the file for the loader's schema inference. Matches
``load_sas.TYPE_INFERENCE_SAMPLE_ROWS`` so suggestions track the loader."""
PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
re.compile(r"^state$", re.IGNORECASE),
re.compile(r"^state_?code$", re.IGNORECASE),
)
"""Column names that are "probably partition columns" regardless of how
many distinct values happen to be present in this one file. Kept tiny on
purpose - add more patterns here later if you want to recognise
region/year/etc."""
INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
re.compile(r"^id$", re.IGNORECASE),
re.compile(r"_id$", re.IGNORECASE),
re.compile(r"_key$", re.IGNORECASE),
re.compile(r"^pk_", re.IGNORECASE),
)
"""Name-bonus patterns for index-candidate ranking."""
_PARTITION_FRIENDLY_TYPES: frozenset = frozenset(
{"TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER",
"INTEGER", "BIGINT", "SMALLINT", "BOOLEAN", "DATE"}
)
# ---------------------------------------------------------------------------
# Per-column streaming aggregator
# ---------------------------------------------------------------------------
@dataclass
class _ColumnStats:
"""Accumulators updated chunk-by-chunk while streaming the file."""
name: str
n_total: int = 0
n_null: int = 0
n_empty_str: int = 0
distinct: set = field(default_factory=set)
distinct_overflow: bool = False
top_counts: "collections.Counter[Any]" = field(default_factory=collections.Counter)
min_val: Any = None
max_val: Any = None
# Numeric running stats (Welford would be nicer but sum/sum-sq is plenty
# here for a "help me pick columns" report).
numeric_sum: float = 0.0
numeric_sumsq: float = 0.0
numeric_count: int = 0
# String byte-length stats (helps flag oversized TEXT columns).
str_max_bytes: int = 0
str_sum_bytes: int = 0
str_count: int = 0
samples: List[Any] = field(default_factory=list)
def update(self, series: pd.Series) -> None:
"""Fold one chunk's worth of this column into the accumulator."""
self.n_total += len(series)
if len(series) == 0:
return
if pd.api.types.is_object_dtype(series):
miss_mask = _char_missing_mask(series)
else:
miss_mask = series.isna()
miss_count = int(miss_mask.sum())
self.n_null += miss_count
non_null = series[~miss_mask]
if pd.api.types.is_object_dtype(series):
# Empty-string tracking is useful for TEXT columns where the loader
# later translates "" -> NULL in the COPY step. A column dominated
# by empty strings is still effectively null even if it isn't NaN.
empty_mask = series.map(lambda v: isinstance(v, str) and v == "")
self.n_empty_str += int(empty_mask.sum())
if pd.api.types.is_numeric_dtype(series) and not non_null.empty:
as_float = non_null.astype("float64")
self.numeric_sum += float(as_float.sum())
self.numeric_sumsq += float((as_float * as_float).sum())
self.numeric_count += int(len(as_float))
cmin = as_float.min()
cmax = as_float.max()
if self.min_val is None or cmin < self.min_val:
self.min_val = cmin
if self.max_val is None or cmax > self.max_val:
self.max_val = cmax
elif pd.api.types.is_datetime64_any_dtype(series) and not non_null.empty:
cmin = non_null.min()
cmax = non_null.max()
if self.min_val is None or cmin < self.min_val:
self.min_val = cmin
if self.max_val is None or cmax > self.max_val:
self.max_val = cmax
if pd.api.types.is_object_dtype(series) and not non_null.empty:
str_like = non_null.map(lambda v: v if isinstance(v, str) else str(v))
byte_lens = str_like.map(lambda s: len(s.encode("utf-8", errors="replace")))
if len(byte_lens):
bmax = int(byte_lens.max())
if bmax > self.str_max_bytes:
self.str_max_bytes = bmax
self.str_sum_bytes += int(byte_lens.sum())
self.str_count += int(len(byte_lens))
for val in non_null.tolist():
hashable = _hashable(val)
if hashable is _UNHASHABLE:
# Give up on distinct/top-counts for this column; it's some
# exotic (e.g. list) value we can't hash, and the drop/index
# suggestions wouldn't be meaningful anyway.
self.distinct_overflow = True
continue
if not self.distinct_overflow:
if hashable in self.distinct:
pass
elif len(self.distinct) >= DISTINCT_CAP:
self.distinct_overflow = True
else:
self.distinct.add(hashable)
if len(self.top_counts) < DISTINCT_CAP or hashable in self.top_counts:
self.top_counts[hashable] += 1
if len(self.samples) < 3:
self.samples.append(val)
# -- Derived properties ------------------------------------------------
@property
def n_non_null(self) -> int:
return self.n_total - self.n_null
@property
def null_pct(self) -> float:
if self.n_total == 0:
return 0.0
return 100.0 * self.n_null / self.n_total
@property
def fill_pct(self) -> float:
return 100.0 - self.null_pct
@property
def distinct_count(self) -> int:
return len(self.distinct)
@property
def distinct_display(self) -> str:
if self.distinct_overflow:
return f">= {DISTINCT_CAP:,}"
return f"{self.distinct_count:,}"
@property
def mean(self) -> Optional[float]:
if self.numeric_count == 0:
return None
return self.numeric_sum / self.numeric_count
@property
def std(self) -> Optional[float]:
if self.numeric_count < 2:
return None
mean = self.mean
var = self.numeric_sumsq / self.numeric_count - (mean * mean)
# Guard against tiny negative from floating point noise.
if var < 0:
var = 0.0
return math.sqrt(var)
@property
def top_value(self) -> Tuple[Any, int]:
if not self.top_counts:
return (None, 0)
return self.top_counts.most_common(1)[0]
def top_values(self, n: int = TOP_N_VALUES) -> List[Tuple[Any, int]]:
return self.top_counts.most_common(n)
class _UnhashableSentinel:
pass
_UNHASHABLE = _UnhashableSentinel()
def _hashable(val: Any) -> Any:
"""Return a hashable form of ``val``, or :data:`_UNHASHABLE` if we can't.
pandas occasionally hands us objects (lists, dicts) from object columns
that aren't hashable. Rather than crashing the whole report, we let the
column fall back to "distinct_overflow" mode for those rows.
"""
try:
hash(val)
return val
except TypeError:
return _UNHASHABLE
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _matches_any(patterns: Tuple[re.Pattern, ...], name: str) -> bool:
return any(p.search(name) for p in patterns)
def _format_size(n_bytes: int) -> str:
size = float(n_bytes)
for unit in ("B", "KB", "MB", "GB", "TB"):
if size < 1024.0 or unit == "TB":
return f"{size:,.1f} {unit}"
size /= 1024.0
return f"{size:,.1f} TB"
def _format_value(val: Any) -> str:
"""Render a single Python value for display in the spreadsheet."""
if val is None:
return ""
if isinstance(val, float) and pd.isna(val):
return ""
if isinstance(val, (pd.Timestamp, dt.date, dt.datetime)):
return str(val)
return repr(val) if isinstance(val, str) else str(val)
def _format_top_values(pairs: List[Tuple[Any, int]]) -> str:
if not pairs:
return ""
return ", ".join(f"{_format_value(v)} ({c:,})" for v, c in pairs)
def _format_samples(samples: List[Any]) -> str:
if not samples:
return "(all null)"
return ", ".join(_format_value(v) for v in samples)
# ---------------------------------------------------------------------------
# Streaming profile
# ---------------------------------------------------------------------------
def profile_file(
path: Path,
*,
chunksize: Optional[int] = None,
) -> Tuple[Dict[str, _ColumnStats], Dict[str, ColumnSpec], Any, int]:
"""Stream ``path`` once, returning (stats, columns, meta, total_rows).
``columns`` is the loader's inferred schema from the first
``PREVIEW_ROWS_FOR_INFERENCE`` rows - identical to what ``load_sas``
would use. ``stats`` are the full-file observations we add on top.
"""
preview_df, meta = read_sas_preview(path, rows=PREVIEW_ROWS_FOR_INFERENCE)
total_rows_hint = getattr(meta, "number_rows", None)
columns = infer_schema(preview_df, meta, total_rows=total_rows_hint)
stats: Dict[str, _ColumnStats] = {
name: _ColumnStats(name=name) for name in columns
}
total_rows = 0
kwargs = {}
if chunksize is not None:
kwargs["chunksize"] = chunksize
for chunk_df, _chunk_meta in iter_sas_chunks(path, **kwargs):
total_rows += len(chunk_df)
print(f" profiling... {total_rows:,} rows", file=sys.stderr)
for name, cs in stats.items():
if name not in chunk_df.columns:
continue
cs.update(chunk_df[name])
return stats, columns, meta, total_rows
# ---------------------------------------------------------------------------
# Classifiers
# ---------------------------------------------------------------------------
@dataclass
class _DropCandidate:
name: str
reason: str
@dataclass
class _PartitionCandidate:
name: str
kind: str # "observed" or "pre_sharded"
distinct_count: int
fill_pct: float
top_values: str
observed_values_in_file: str
note: str
score: float
@dataclass
class _IndexCandidate:
name: str
uniqueness_pct: float
distinct_count: int
fill_pct: float
name_bonus: bool
note: str
score: float
@dataclass
class _TypeWarning:
name: str
severity: str # "info" | "warn" | "error"
message: str
def _is_constant_like(cs: _ColumnStats) -> bool:
"""True when the column is effectively a single value (possibly with
a handful of nulls / empties mixed in)."""
if cs.n_non_null == 0:
return False
return cs.distinct_count == 1 and not cs.distinct_overflow
def classify(
stats: Dict[str, _ColumnStats],
columns: Dict[str, ColumnSpec],
*,
high_null_pct: float,
index_uniqueness_pct: float,
partition_min_distinct: int,
partition_max_distinct: int,
partition_min_fill_pct: float,
pre_sharded_max_distinct: int,
) -> Tuple[
List[_DropCandidate],
List[_PartitionCandidate],
List[_IndexCandidate],
List[_TypeWarning],
]:
"""Turn per-column stats + the loader's schema into four ranked lists."""
drops: List[_DropCandidate] = []
partitions: List[_PartitionCandidate] = []
indexes: List[_IndexCandidate] = []
warnings: List[_TypeWarning] = []
# Names we've already routed into the partition lane - exclude them from
# the drop / index lanes downstream.
claimed_by_partition: set = set()
# -- First pass: partition-name-matched columns ------------------------
# Run this before the drop check so pre-sharded STATE columns don't get
# silently dropped.
for name, cs in stats.items():
spec = columns.get(name)
if not _matches_any(PARTITION_NAME_PATTERNS, name):
continue
if cs.n_total == 0:
continue
looks_pre_sharded = (
cs.n_non_null > 0
and not cs.distinct_overflow
and cs.distinct_count <= pre_sharded_max_distinct
and cs.fill_pct >= partition_min_fill_pct
)
if looks_pre_sharded:
observed = ", ".join(_format_value(v) for v, _ in cs.top_values(pre_sharded_max_distinct))
note_parts = [
f"pre-sharded: this file only contains {cs.distinct_count} distinct "
f"value(s) ({observed})",
"keep the column and set partition_by at the load_folder level so "
"sibling files merge into separate partitions of one table",
]
partitions.append(
_PartitionCandidate(
name=name,
kind="pre_sharded",
distinct_count=cs.distinct_count,
fill_pct=cs.fill_pct,
top_values=_format_top_values(cs.top_values()),
observed_values_in_file=observed,
note="; ".join(note_parts),
# Pre-sharded STATE always wins the ranking.
score=1_000_000.0,
)
)
claimed_by_partition.add(name)
continue
# Name-matched but not pre-sharded: fall through into the regular
# partition candidate pass below, which will score it up due to the
# name match.
# -- Drop candidates ---------------------------------------------------
for name, cs in stats.items():
if name in claimed_by_partition:
continue
if cs.n_total == 0:
continue
reason: Optional[str] = None
if cs.n_null == cs.n_total:
reason = "all-null"
elif (
cs.n_non_null > 0
and cs.distinct_count == 0
and not cs.distinct_overflow
):
# Non-null but nothing hashable captured - treat as opaque.
reason = "all-empty / unhashable"
elif cs.n_non_null == cs.n_empty_str and cs.n_empty_str > 0:
reason = "all-empty"
elif _is_constant_like(cs):
only_val = next(iter(cs.distinct))
reason = f"constant={_format_value(only_val)}"
elif cs.null_pct >= high_null_pct:
reason = f"null_pct={cs.null_pct:.1f}%"
if reason is not None:
drops.append(_DropCandidate(name=name, reason=reason))
dropped_names = {d.name for d in drops}
# -- Partition candidates (observed) ----------------------------------
for name, cs in stats.items():
if name in claimed_by_partition or name in dropped_names:
continue
spec = columns.get(name)
if spec is None:
continue
pg_type = spec.postgres_type.upper()
if pg_type not in _PARTITION_FRIENDLY_TYPES:
continue
if cs.distinct_overflow:
continue
if not (
partition_min_distinct <= cs.distinct_count <= partition_max_distinct
):
continue
if cs.fill_pct < partition_min_fill_pct:
continue
name_match = _matches_any(PARTITION_NAME_PATTERNS, name)
# Score: name-match dominates, then prefer fewer partitions (safer
# DDL), then prefer more-filled columns as a tiebreaker.
score = (
(500_000.0 if name_match else 0.0)
+ (partition_max_distinct - cs.distinct_count)
+ cs.fill_pct
)
notes: List[str] = []
if name_match:
notes.append("name matches PARTITION_NAME_PATTERNS")
if cs.distinct_count > 10_000:
notes.append(
f"distinct_count={cs.distinct_count:,} exceeds loader "
"max_partitions default (10,000); expect DDL warnings"
)
notes.append(
"LIST partitioning creates one child table per distinct value "
"(see load_sas.render_partition_ddl)"
)
partitions.append(
_PartitionCandidate(
name=name,
kind="observed",
distinct_count=cs.distinct_count,
fill_pct=cs.fill_pct,
top_values=_format_top_values(cs.top_values()),
observed_values_in_file=_format_top_values(cs.top_values()),
note="; ".join(notes),
score=score,
)
)
partitions.sort(key=lambda p: p.score, reverse=True)
partition_names = {p.name for p in partitions}
# -- Index candidates --------------------------------------------------
for name, cs in stats.items():
if name in dropped_names or name in partition_names:
continue
spec = columns.get(name)
if spec is None:
continue
if cs.n_non_null == 0:
continue
if cs.distinct_overflow:
# Super-high-cardinality → perfect candidate for an index.
uniqueness = 100.0
distinct_count = DISTINCT_CAP # display sentinel
else:
uniqueness = 100.0 * cs.distinct_count / cs.n_non_null
distinct_count = cs.distinct_count
if uniqueness < index_uniqueness_pct:
continue
name_bonus = _matches_any(INDEX_NAME_PATTERNS, name)
notes: List[str] = []
if name_bonus:
notes.append("name matches INDEX_NAME_PATTERNS (ID/KEY-ish)")
if cs.distinct_overflow:
notes.append(
f"distinct tracking capped at {DISTINCT_CAP:,}; "
"treating as high-cardinality"
)
# Rank: name match dominates, then raw uniqueness, then fill.
score = (500_000.0 if name_bonus else 0.0) + uniqueness + cs.fill_pct / 100.0
indexes.append(
_IndexCandidate(
name=name,
uniqueness_pct=uniqueness,
distinct_count=distinct_count,
fill_pct=cs.fill_pct,
name_bonus=name_bonus,
note="; ".join(notes),
score=score,
)
)
indexes.sort(key=lambda i: i.score, reverse=True)
# -- Type warnings -----------------------------------------------------
for name, cs in stats.items():
spec = columns.get(name)
if spec is None:
continue
# Re-surface whatever the loader's own inference already flagged in
# notes - these are genuinely useful for the user to see without
# having to dry-run the loader.
for note in spec.notes:
warnings.append(
_TypeWarning(name=name, severity="info", message=note)
)
if spec.sampled:
warnings.append(
_TypeWarning(
name=name,
severity="info",
message=(
"loader inferred type from a bounded preview; "
"sampled=True"
),
)
)
pg_type = spec.postgres_type.upper()
# Preview said NOT NULL but the full file has nulls - loader would
# have emitted NOT NULL and then choked on COPY.
if not spec.nullable and cs.n_null > 0:
warnings.append(
_TypeWarning(
name=name,
severity="error",
message=(
f"preview saw zero nulls (NOT NULL) but full file has "
f"{cs.n_null:,} null(s); COPY would fail under the "
"loader's inferred NOT NULL"
),
)
)
# INTEGER range check against the full-file observed min/max.
if pg_type == "INTEGER" and cs.numeric_count > 0:
lo, hi = NUMERIC_INT_RANGE
vmin = cs.min_val if cs.min_val is not None else 0
vmax = cs.max_val if cs.max_val is not None else 0
try:
if vmin < lo or vmax > hi:
warnings.append(
_TypeWarning(
name=name,
severity="error",
message=(
f"loader inferred INTEGER from the preview but "
f"full-file range [{vmin}, {vmax}] overflows "
f"int4 {NUMERIC_INT_RANGE}; BIGINT required"
),
)
)
except TypeError:
pass
# Preview said all-null (loader defaults to TEXT) but data exists.
was_all_null_preview = any(
"all-null column" in n for n in spec.notes
)
if was_all_null_preview and cs.n_non_null > 0:
warnings.append(
_TypeWarning(
name=name,
severity="warn",
message=(
"preview was all-null so loader defaulted to TEXT, "
f"but full file has {cs.n_non_null:,} non-null "
"value(s); consider a tighter include/exclude or "
"re-inferring with TYPE_INFERENCE_SAMPLE_ROWS=None"
),
)
)
return drops, partitions, indexes, warnings
# ---------------------------------------------------------------------------
# YAML snippet
# ---------------------------------------------------------------------------
def render_yaml_snippet(
drops: List[_DropCandidate],
partitions: List[_PartitionCandidate],
indexes: List[_IndexCandidate],
) -> str:
"""Produce a paste-ready YAML snippet for the loader config."""
lines: List[str] = ["# Suggested additions to your load_sas.py / load_folder.py config"]
if drops:
lines.append("exclude:")
for d in drops:
lines.append(f" - {d.name} # {d.reason}")
else:
lines.append("# (no drop candidates found)")
lines.append("")
if partitions:
top = partitions[0]
if top.kind == "pre_sharded":
lines.append(
f"# !! PRE-SHARDED: this file only contains "
f"{top.name} = {top.observed_values_in_file}."
)
lines.append(
"# !! Keep the column in the schema and set partition_by at the "
"load_folder level"
)
lines.append(
"# !! so sibling files merge into one table under separate "
"partitions."
)
lines.append("partition_by:")
lines.append(f" - {top.name}")
if len(partitions) > 1:
lines.append(
"# Runners-up (append to partition_by for multi-level "
"LIST partitioning; see load_sas.render_partition_ddl):"
)
for p in partitions[1:]:
lines.append(
f"# - {p.name} # kind={p.kind} distinct={p.distinct_count}"
)
else:
lines.append("# (no partition candidates found)")
lines.append("")
if indexes:
lines.append("indexes:")
for i in indexes:
bonus = " (name match)" if i.name_bonus else ""
lines.append(
f" - {i.name} # uniqueness={i.uniqueness_pct:.1f}%{bonus}"
)
else:
lines.append("# (no index candidates found)")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# XLSX writer
# ---------------------------------------------------------------------------
_HEADER_FONT = Font(bold=True, color="FFFFFF")
_HEADER_FILL = PatternFill("solid", fgColor="305496")
_WARN_FILL = PatternFill("solid", fgColor="FFE699")
_ERROR_FILL = PatternFill("solid", fgColor="F4B183")
def _write_header(ws, headers: List[str]) -> None:
for col_idx, label in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=label)
cell.font = _HEADER_FONT
cell.fill = _HEADER_FILL
cell.alignment = Alignment(vertical="center")
ws.freeze_panes = "A2"
def _autosize(ws, *, max_width: int = 60) -> None:
for col_cells in ws.columns:
letter = get_column_letter(col_cells[0].column)
longest = 0
for cell in col_cells:
if cell.value is None:
continue
text = str(cell.value)
# Only measure the first line so a long YAML cell doesn't push
# everything else ultra-wide.
longest = max(longest, min(len(text.split("\n", 1)[0]), max_width))
ws.column_dimensions[letter].width = min(max(longest + 2, 10), max_width)
def _write_overview(
ws,
*,
path: Path,
size_bytes: int,
total_rows: int,
total_cols: int,
thresholds: Dict[str, Any],
) -> None:
ws.cell(row=1, column=1, value="Field").font = _HEADER_FONT
ws.cell(row=1, column=1).fill = _HEADER_FILL
ws.cell(row=1, column=2, value="Value").font = _HEADER_FONT
ws.cell(row=1, column=2).fill = _HEADER_FILL
ws.freeze_panes = "A2"
rows = [
("File path", str(path)),
("File size", _format_size(size_bytes)),
("Extension", path.suffix.lower()),
("Total rows", f"{total_rows:,}"),
("Total columns", f"{total_cols:,}"),
("Generated at", dt.datetime.now().isoformat(timespec="seconds")),
]
for k, v in thresholds.items():
rows.append((f"threshold: {k}", str(v)))
for i, (k, v) in enumerate(rows, start=2):
ws.cell(row=i, column=1, value=k)
ws.cell(row=i, column=2, value=v)
_autosize(ws)
def _write_columns(
ws,
stats: Dict[str, _ColumnStats],
columns: Dict[str, ColumnSpec],
) -> None:
headers = [
"column", "sas_format", "source_dtype", "inferred_postgres_type",
"nullable", "n_total", "n_null", "null_pct", "distinct_count",
"min", "max", "mean", "std",
"top_value", "top_count",
"max_str_bytes", "mean_str_bytes",
"sample_values", "notes",
]
_write_header(ws, headers)
for row_idx, (name, cs) in enumerate(stats.items(), start=2):
spec = columns.get(name)
top_val, top_count = cs.top_value
mean_bytes = (cs.str_sum_bytes / cs.str_count) if cs.str_count else None
values = [
name,
spec.sas_format if spec else "",
spec.source_dtype if spec else "",
spec.postgres_type if spec else "",
"YES" if (spec and spec.nullable) else "NO",
cs.n_total,
cs.n_null,
round(cs.null_pct, 3),
cs.distinct_display,
_format_value(cs.min_val),
_format_value(cs.max_val),
round(cs.mean, 6) if cs.mean is not None else "",
round(cs.std, 6) if cs.std is not None else "",
_format_value(top_val),
top_count or "",
cs.str_max_bytes or "",
round(mean_bytes, 2) if mean_bytes is not None else "",
_format_samples(cs.samples),
"; ".join(spec.notes) if spec and spec.notes else "",
]
for col_idx, v in enumerate(values, start=1):
ws.cell(row=row_idx, column=col_idx, value=v)
_autosize(ws)
def _write_drop(ws, drops: List[_DropCandidate]) -> None:
headers = ["column", "reason"]
_write_header(ws, headers)
if not drops:
ws.cell(row=2, column=1, value="(no drop candidates)")
for i, d in enumerate(drops, start=2):
ws.cell(row=i, column=1, value=d.name)
ws.cell(row=i, column=2, value=d.reason)
_autosize(ws)
def _write_partition(ws, partitions: List[_PartitionCandidate]) -> None:
headers = [
"rank", "column", "kind", "distinct_count", "fill_pct",
"observed_values_in_file", "top_values", "score", "note",
]
_write_header(ws, headers)
if not partitions:
ws.cell(row=2, column=1, value="(no partition candidates)")
for rank, p in enumerate(partitions, start=1):
row = rank + 1
ws.cell(row=row, column=1, value=rank)
ws.cell(row=row, column=2, value=p.name)
ws.cell(row=row, column=3, value=p.kind)
ws.cell(row=row, column=4, value=p.distinct_count)
ws.cell(row=row, column=5, value=round(p.fill_pct, 3))
ws.cell(row=row, column=6, value=p.observed_values_in_file)
ws.cell(row=row, column=7, value=p.top_values)
ws.cell(row=row, column=8, value=round(p.score, 3))
ws.cell(row=row, column=9, value=p.note)
if p.kind == "pre_sharded":
for col in range(1, len(headers) + 1):
ws.cell(row=row, column=col).fill = _WARN_FILL
_autosize(ws)
def _write_index(ws, indexes: List[_IndexCandidate]) -> None:
headers = [
"rank", "column", "uniqueness_pct", "distinct_count", "fill_pct",
"name_bonus", "score", "note",
]
_write_header(ws, headers)
if not indexes:
ws.cell(row=2, column=1, value="(no index candidates)")
for rank, i in enumerate(indexes, start=1):
row = rank + 1
ws.cell(row=row, column=1, value=rank)
ws.cell(row=row, column=2, value=i.name)
ws.cell(row=row, column=3, value=round(i.uniqueness_pct, 3))
ws.cell(row=row, column=4, value=i.distinct_count)
ws.cell(row=row, column=5, value=round(i.fill_pct, 3))
ws.cell(row=row, column=6, value="YES" if i.name_bonus else "NO")
ws.cell(row=row, column=7, value=round(i.score, 3))
ws.cell(row=row, column=8, value=i.note)
_autosize(ws)
def _write_warnings(ws, warnings: List[_TypeWarning]) -> None:
headers = ["column", "severity", "message"]
_write_header(ws, headers)
if not warnings:
ws.cell(row=2, column=1, value="(no type warnings)")
for i, w in enumerate(warnings, start=2):
ws.cell(row=i, column=1, value=w.name)
ws.cell(row=i, column=2, value=w.severity)
ws.cell(row=i, column=3, value=w.message)
fill = None
if w.severity == "error":
fill = _ERROR_FILL
elif w.severity == "warn":
fill = _WARN_FILL
if fill is not None:
for col in range(1, len(headers) + 1):
ws.cell(row=i, column=col).fill = fill
_autosize(ws)
def _write_yaml_sheet(ws, snippet: str) -> None:
ws.cell(row=1, column=1, value="YAML suggestion (paste into your loader config)").font = _HEADER_FONT
ws.cell(row=1, column=1).fill = _HEADER_FILL
cell = ws.cell(row=2, column=1, value=snippet)
cell.alignment = Alignment(wrap_text=True, vertical="top")
# Pick a comfy width for YAML; row height is auto when wrap_text is on.
ws.column_dimensions["A"].width = 100
def write_report(
out_path: Path,
*,
path: Path,
size_bytes: int,
total_rows: int,
stats: Dict[str, _ColumnStats],
columns: Dict[str, ColumnSpec],
drops: List[_DropCandidate],
partitions: List[_PartitionCandidate],
indexes: List[_IndexCandidate],
warnings: List[_TypeWarning],
yaml_snippet: str,
thresholds: Dict[str, Any],
) -> None:
wb = Workbook()
ws = wb.active
ws.title = "Overview"
_write_overview(
ws,
path=path,
size_bytes=size_bytes,
total_rows=total_rows,
total_cols=len(columns),
thresholds=thresholds,
)
_write_columns(wb.create_sheet("Columns"), stats, columns)
_write_drop(wb.create_sheet("Drop candidates"), drops)
_write_partition(wb.create_sheet("Partition candidates"), partitions)
_write_index(wb.create_sheet("Index candidates"), indexes)
_write_warnings(wb.create_sheet("Type warnings"), warnings)
_write_yaml_sheet(wb.create_sheet("YAML suggestion"), yaml_snippet)
wb.save(out_path)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Profile a local SAS file (.sas7bdat / .xpt / .xport) and write "
"an Excel report with drop, partition_by, and index suggestions "
"for generic_loader/load_sas.py and load_folder.py."
),
)
p.add_argument("--file", type=Path, default=Path(SAS_PATH),
help=f"Path to the SAS file to profile (default: {SAS_PATH!r}).")
p.add_argument("--out", type=Path, default=Path(OUTPUT_XLSX),
help=f"Where to write the .xlsx report (default: {OUTPUT_XLSX!r}).")
p.add_argument("--high-null-pct", type=float, default=HIGH_NULL_PCT,
help="Null percentage at/above which a column is a drop candidate.")
p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT,
help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.")
p.add_argument("--partition-min-distinct", type=int, default=PARTITION_MIN_DISTINCT)
p.add_argument("--partition-max-distinct", type=int, default=PARTITION_MAX_DISTINCT)
p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT)
p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT)
return p
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
path: Path = args.file
out_path: Path = args.out
if not path.exists():
print(f"error: SAS file not found: {path}", file=sys.stderr)
return 2
print(f"profiling {path} -> {out_path}", file=sys.stderr)
stats, columns, meta, total_rows = profile_file(path)
drops, partitions, indexes, warnings = classify(
stats, columns,
high_null_pct=args.high_null_pct,
index_uniqueness_pct=args.index_uniqueness_pct,
partition_min_distinct=args.partition_min_distinct,
partition_max_distinct=args.partition_max_distinct,
partition_min_fill_pct=args.partition_min_fill_pct,
pre_sharded_max_distinct=args.pre_sharded_max_distinct,
)
yaml_snippet = render_yaml_snippet(drops, partitions, indexes)
thresholds = {
"HIGH_NULL_PCT": args.high_null_pct,
"INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct,
"PARTITION_MIN_DISTINCT": args.partition_min_distinct,
"PARTITION_MAX_DISTINCT": args.partition_max_distinct,
"PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct,
"PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct,
"DISTINCT_CAP": DISTINCT_CAP,
"TOP_N_VALUES": TOP_N_VALUES,
"PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
write_report(
out_path,
path=path,
size_bytes=os.path.getsize(path),
total_rows=total_rows,
stats=stats,
columns=columns,
drops=drops,
partitions=partitions,
indexes=indexes,
warnings=warnings,
yaml_snippet=yaml_snippet,
thresholds=thresholds,
)
print(
f"wrote {out_path} ({len(stats)} columns, {total_rows:,} rows scanned)\n"
f" drops: {len(drops)}\n"
f" partitions: {len(partitions)}\n"
f" indexes: {len(indexes)}\n"
f" warnings: {len(warnings)}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())