1275 lines
44 KiB
Python
1275 lines
44 KiB
Python
"""Standalone utility that profiles a single local SAS or delimited text file
|
|
and writes an Excel report with drop, partition, and index candidates plus
|
|
type-inference warnings.
|
|
|
|
Configure the constants below and run::
|
|
|
|
python3 utils/sas_profiler.py
|
|
|
|
Or override any of them from the command line::
|
|
|
|
python3 utils/sas_profiler.py \
|
|
--file ./data/mystate.sas7bdat \
|
|
--out ./reports/mystate_profile.xlsx
|
|
|
|
# Profile a CSV file with default comma delimiter:
|
|
python3 utils/sas_profiler.py --file ./data/myfile.csv
|
|
|
|
# Profile a TSV file (tab delimiter auto-detected from extension):
|
|
python3 utils/sas_profiler.py --file ./data/myfile.tsv
|
|
|
|
# Profile a pipe-delimited .txt file:
|
|
python3 utils/sas_profiler.py --file ./data/myfile.txt --delimiter '|'
|
|
|
|
The report is a paste-ready companion to
|
|
``generic_loader/load_sas.py`` and ``generic_loader/load_folder.py``: the
|
|
"inferred Postgres type" column uses the loader's own ``infer_schema`` so the
|
|
drop / partition / index suggestions map one-to-one onto valid YAML config
|
|
entries for those scripts.
|
|
|
|
Supported inputs:
|
|
|
|
- SAS files: ``.sas7bdat`` / ``.xpt`` / ``.xport``
|
|
- Delimited text files: ``.csv`` / ``.tsv`` / ``.txt`` (with headers)
|
|
|
|
For text files, SAS-specific metadata (formats, labels) is not available;
|
|
those fields show "N/A" in the report. All other profiling (column names,
|
|
data types from pandas, value distributions, null counts, etc.) works
|
|
identically.
|
|
|
|
Python 3.10+ compatible.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import collections
|
|
import datetime as dt
|
|
import math
|
|
import os
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
# The loader lives in a sibling directory that is *not* a proper package
|
|
# (no __init__.py). Its own modules import each other by bare name, so we
|
|
# add the directory to sys.path before importing it here.
|
|
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(_REPO_ROOT / "generic_loader"))
|
|
|
|
import numpy as np # noqa: E402
|
|
import pandas as pd # noqa: E402
|
|
from openpyxl import Workbook # noqa: E402
|
|
from openpyxl.styles import Alignment, Font, PatternFill # noqa: E402
|
|
from openpyxl.utils import get_column_letter # noqa: E402
|
|
|
|
from load_sas import ( # noqa: E402
|
|
NUMERIC_INT_RANGE,
|
|
ColumnSpec,
|
|
infer_schema,
|
|
iter_sas_chunks,
|
|
read_sas_preview,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File extension constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TEXT_EXTENSIONS = {".txt", ".csv", ".tsv"}
|
|
"""File extensions recognised as delimited text files."""
|
|
|
|
SAS_EXTENSIONS = {".sas7bdat", ".xpt", ".xport"}
|
|
"""File extensions recognised as SAS data files."""
|
|
|
|
SUPPORTED_EXTENSIONS = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
|
"""All file extensions the profiler can handle."""
|
|
|
|
|
|
def _is_text_file(path: Path) -> bool:
|
|
"""Return True if ``path`` has a recognised delimited-text extension."""
|
|
return path.suffix.lower() in TEXT_EXTENSIONS
|
|
|
|
|
|
def _is_supported_file(path: Path) -> bool:
|
|
"""Return True if ``path`` has any supported extension."""
|
|
return path.suffix.lower() in SUPPORTED_EXTENSIONS
|
|
|
|
|
|
def _auto_delimiter(path: Path, explicit: Optional[str]) -> str:
|
|
"""Return the effective delimiter for *path*.
|
|
|
|
If the caller supplied an explicit delimiter, use it. Otherwise default
|
|
to ``"\\t"`` for ``.tsv`` files and ``","`` for everything else.
|
|
"""
|
|
if explicit is not None:
|
|
return explicit
|
|
if path.suffix.lower() == ".tsv":
|
|
return "\t"
|
|
return ","
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration - edit these before running, or override via CLI flags
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SAS_PATH: str = "./generic_loader/samples/sample_kitchensink.xpt"
|
|
"""Local path to the file to profile. Accepts ``.sas7bdat``, ``.xpt``,
|
|
``.xport``, ``.csv``, ``.tsv``, or ``.txt``."""
|
|
|
|
OUTPUT_XLSX: str = "./sas_profile.xlsx"
|
|
"""Where to write the Excel report."""
|
|
|
|
HIGH_NULL_PCT: float = 95.0
|
|
"""Columns whose null percentage meets or exceeds this threshold are flagged
|
|
as drop candidates."""
|
|
|
|
INDEX_UNIQUENESS_PCT: float = 95.0
|
|
"""Columns whose distinct/non-null ratio meets or exceeds this threshold are
|
|
flagged as index candidates."""
|
|
|
|
PARTITION_MIN_FILL_PCT: float = 95.0
|
|
"""Name-matched partition candidates must be non-null in at least this
|
|
fraction of rows."""
|
|
|
|
PRE_SHARDED_MAX_DISTINCT: int = 3
|
|
"""A name-matched column with <= this many distinct values is treated as
|
|
pre-sharded ("this file is one slice; sibling files have the other values")
|
|
rather than as a ready-to-partition observed column."""
|
|
|
|
DISTINCT_CAP: int = 10_000
|
|
"""Max size of the per-column distinct-value set. Exceeding this marks the
|
|
column as ``distinct_overflow`` and we report ">= CAP" in the xlsx."""
|
|
|
|
TOP_N_VALUES: int = 5
|
|
"""Number of most-frequent values tracked per column."""
|
|
|
|
PREVIEW_ROWS_FOR_INFERENCE: int = 10_000
|
|
"""Rows pulled from the file for the loader's schema inference. Matches
|
|
``load_sas.TYPE_INFERENCE_SAMPLE_ROWS`` so suggestions track the loader."""
|
|
|
|
PROFILE_CHUNK_ROWS: int = 5_000_000
|
|
"""Rows per streaming chunk while profiling. Larger chunks amortize
|
|
pyreadstat / pandas overhead, and the profiler is typically run on a
|
|
beefy box (e.g. a 128 GB EC2) rather than a laptop, so the default is
|
|
set aggressively.
|
|
|
|
Rough peak-memory estimate while a chunk is in flight:
|
|
|
|
peak_bytes ~= chunksize * num_cols * ~50 bytes/cell * 2-3x
|
|
|
|
(The 2-3x factor covers pyreadstat's read buffer + pandas frame
|
|
construction temporaries.) At 5M rows x 50 cols that's roughly 10-20 GB,
|
|
which is comfortable on a 128 GB host but would OOM a laptop.
|
|
|
|
If you have lots of RAM and a very wide file, lower this; if you have a
|
|
narrow file and want max throughput, bump it higher with ``--chunksize``
|
|
(the profiler will happily take 20M+ per chunk). If ``chunksize`` is
|
|
larger than the file, pyreadstat just hands back one chunk."""
|
|
|
|
|
|
PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
|
# ``state`` or ``state_code`` / ``statecode`` appearing as a full token
|
|
# anywhere in the column name. Uses underscore / start / end as token
|
|
# boundaries so we catch STATE, STATE_CODE, HOME_STATE,
|
|
# ADDR_LINE3_STATE, BIRTH_STATE_CODE, etc. without matching STATUS,
|
|
# ESTATE, INTERSTATE, or STATEWIDE.
|
|
re.compile(r"(?:^|_)state(?:_?code)?(?:_|$)", re.IGNORECASE),
|
|
)
|
|
"""Only columns whose name matches one of these patterns are ever considered
|
|
partition candidates. This deliberately ignores generic low-cardinality
|
|
signals (status flags, boolean columns, etc.) because in practice the only
|
|
useful partition key in this codebase is STATE. Add more patterns here if
|
|
that ever stops being true."""
|
|
|
|
|
|
INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
|
re.compile(r"^id$", re.IGNORECASE),
|
|
re.compile(r"_id$", re.IGNORECASE),
|
|
re.compile(r"_key$", re.IGNORECASE),
|
|
re.compile(r"^pk_", re.IGNORECASE),
|
|
)
|
|
"""Name-bonus patterns for index-candidate ranking."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-column streaming aggregator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class _ColumnStats:
|
|
"""Accumulators updated chunk-by-chunk while streaming the file."""
|
|
|
|
name: str
|
|
n_total: int = 0
|
|
n_null: int = 0
|
|
n_empty_str: int = 0
|
|
|
|
distinct: set = field(default_factory=set)
|
|
distinct_overflow: bool = False
|
|
|
|
top_counts: "collections.Counter[Any]" = field(default_factory=collections.Counter)
|
|
|
|
min_val: Any = None
|
|
max_val: Any = None
|
|
|
|
# Numeric running stats (Welford would be nicer but sum/sum-sq is plenty
|
|
# here for a "help me pick columns" report).
|
|
numeric_sum: float = 0.0
|
|
numeric_sumsq: float = 0.0
|
|
numeric_count: int = 0
|
|
|
|
# String byte-length stats (helps flag oversized TEXT columns).
|
|
str_max_bytes: int = 0
|
|
str_sum_bytes: int = 0
|
|
str_count: int = 0
|
|
|
|
samples: List[Any] = field(default_factory=list)
|
|
|
|
def update(self, series: pd.Series) -> None:
|
|
"""Fold one chunk's worth of this column into the accumulator.
|
|
|
|
Implementation notes (this method is the dominant per-file cost):
|
|
|
|
- All masks are vectorized - no ``Series.map(lambda ...)`` loops.
|
|
- Distinct tracking uses ``Series.value_counts`` so we iterate at
|
|
most once per *unique* value in the chunk (in C), not once per
|
|
row.
|
|
- Once ``distinct_overflow`` is set and ``top_counts`` is full,
|
|
subsequent chunks skip the value-counts pass entirely - we already
|
|
know the column is too varied to be a partition / drop candidate
|
|
and we already have the top-N.
|
|
"""
|
|
n = len(series)
|
|
if n == 0:
|
|
return
|
|
self.n_total += n
|
|
|
|
is_object = pd.api.types.is_object_dtype(series)
|
|
is_numeric = pd.api.types.is_numeric_dtype(series)
|
|
is_datetime = pd.api.types.is_datetime64_any_dtype(series)
|
|
|
|
if is_object:
|
|
# Vectorized equivalent of load_sas._char_missing_mask: treat
|
|
# None / NaN / empty string as missing. ``series == ""`` is
|
|
# False for non-string values so we don't need per-element type
|
|
# checks.
|
|
na_mask = series.isna()
|
|
empty_mask = (series == "") & ~na_mask
|
|
miss_mask = na_mask | empty_mask
|
|
self.n_empty_str += int(empty_mask.sum())
|
|
else:
|
|
miss_mask = series.isna()
|
|
|
|
self.n_null += int(miss_mask.sum())
|
|
non_null = series[~miss_mask] if miss_mask.any() else series
|
|
if non_null.empty:
|
|
return
|
|
|
|
# -- Numeric stats (C-level) ---------------------------------------
|
|
if is_numeric:
|
|
arr = non_null.to_numpy(dtype="float64", copy=False, na_value=np.nan)
|
|
# NaN-safe aggregates in one pass each (all C-level).
|
|
self.numeric_sum += float(np.nansum(arr))
|
|
self.numeric_sumsq += float(np.nansum(arr * arr))
|
|
self.numeric_count += int(arr.size)
|
|
cmin = float(np.nanmin(arr)) if arr.size else None
|
|
cmax = float(np.nanmax(arr)) if arr.size else None
|
|
if cmin is not None and (self.min_val is None or cmin < self.min_val):
|
|
self.min_val = cmin
|
|
if cmax is not None and (self.max_val is None or cmax > self.max_val):
|
|
self.max_val = cmax
|
|
|
|
elif is_datetime:
|
|
cmin = non_null.min()
|
|
cmax = non_null.max()
|
|
if self.min_val is None or cmin < self.min_val:
|
|
self.min_val = cmin
|
|
if self.max_val is None or cmax > self.max_val:
|
|
self.max_val = cmax
|
|
|
|
# -- String length stats via vectorized str.len --------------------
|
|
# ``.str.len()`` is C-fast; for ASCII-dominated SAS data it matches
|
|
# UTF-8 byte length closely enough for the "oversized TEXT" flag.
|
|
if is_object:
|
|
lens = non_null.astype(str, copy=False).str.len()
|
|
lens = lens.dropna()
|
|
if not lens.empty:
|
|
bmax = int(lens.max())
|
|
if bmax > self.str_max_bytes:
|
|
self.str_max_bytes = bmax
|
|
self.str_sum_bytes += int(lens.sum())
|
|
self.str_count += int(lens.size)
|
|
|
|
# -- Samples (tiny slice; free) ------------------------------------
|
|
if len(self.samples) < 3:
|
|
needed = 3 - len(self.samples)
|
|
self.samples.extend(non_null.head(needed).tolist())
|
|
|
|
# -- Distinct / top_counts (vectorized via value_counts) -----------
|
|
# Skip altogether once we're saturated: distinct is already known
|
|
# to be > DISTINCT_CAP and top_counts has its DISTINCT_CAP slots
|
|
# filled, so further value_counts calls can only bump existing
|
|
# keys - info we don't need for any of the classifiers.
|
|
top_full = len(self.top_counts) >= DISTINCT_CAP
|
|
if self.distinct_overflow and top_full:
|
|
return
|
|
|
|
try:
|
|
vc = non_null.value_counts(sort=False)
|
|
except TypeError:
|
|
# Unhashable values (list/dict). Drop the column from both
|
|
# distinct and top-N tracking.
|
|
self.distinct_overflow = True
|
|
return
|
|
|
|
if vc.empty:
|
|
return
|
|
|
|
if not self.distinct_overflow:
|
|
# Only *new* values need to be considered for the distinct set.
|
|
for val in vc.index:
|
|
if val in self.distinct:
|
|
continue
|
|
if len(self.distinct) >= DISTINCT_CAP:
|
|
self.distinct_overflow = True
|
|
break
|
|
self.distinct.add(val)
|
|
|
|
if not top_full:
|
|
# Bulk-merge known keys; cap adds for new keys.
|
|
for val, count in zip(vc.index.tolist(), vc.to_numpy().tolist()):
|
|
if val in self.top_counts:
|
|
self.top_counts[val] += int(count)
|
|
elif len(self.top_counts) < DISTINCT_CAP:
|
|
self.top_counts[val] = int(count)
|
|
# else: silently skip - we're past the cap.
|
|
else:
|
|
# Only existing keys can grow.
|
|
tc = self.top_counts
|
|
for val, count in zip(vc.index.tolist(), vc.to_numpy().tolist()):
|
|
if val in tc:
|
|
tc[val] += int(count)
|
|
|
|
# -- Derived properties ------------------------------------------------
|
|
|
|
@property
|
|
def n_non_null(self) -> int:
|
|
return self.n_total - self.n_null
|
|
|
|
@property
|
|
def null_pct(self) -> float:
|
|
if self.n_total == 0:
|
|
return 0.0
|
|
return 100.0 * self.n_null / self.n_total
|
|
|
|
@property
|
|
def fill_pct(self) -> float:
|
|
return 100.0 - self.null_pct
|
|
|
|
@property
|
|
def distinct_count(self) -> int:
|
|
return len(self.distinct)
|
|
|
|
@property
|
|
def distinct_display(self) -> str:
|
|
if self.distinct_overflow:
|
|
return f">= {DISTINCT_CAP:,}"
|
|
return f"{self.distinct_count:,}"
|
|
|
|
@property
|
|
def mean(self) -> Optional[float]:
|
|
if self.numeric_count == 0:
|
|
return None
|
|
return self.numeric_sum / self.numeric_count
|
|
|
|
@property
|
|
def std(self) -> Optional[float]:
|
|
if self.numeric_count < 2:
|
|
return None
|
|
mean = self.mean
|
|
var = self.numeric_sumsq / self.numeric_count - (mean * mean)
|
|
# Guard against tiny negative from floating point noise.
|
|
if var < 0:
|
|
var = 0.0
|
|
return math.sqrt(var)
|
|
|
|
@property
|
|
def top_value(self) -> Tuple[Any, int]:
|
|
if not self.top_counts:
|
|
return (None, 0)
|
|
return self.top_counts.most_common(1)[0]
|
|
|
|
def top_values(self, n: int = TOP_N_VALUES) -> List[Tuple[Any, int]]:
|
|
return self.top_counts.most_common(n)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _matches_any(patterns: Tuple[re.Pattern, ...], name: str) -> bool:
|
|
return any(p.search(name) for p in patterns)
|
|
|
|
|
|
def _format_size(n_bytes: int) -> str:
|
|
size = float(n_bytes)
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if size < 1024.0 or unit == "TB":
|
|
return f"{size:,.1f} {unit}"
|
|
size /= 1024.0
|
|
return f"{size:,.1f} TB"
|
|
|
|
|
|
def _format_value(val: Any) -> str:
|
|
"""Render a single Python value for display in the spreadsheet."""
|
|
if val is None:
|
|
return ""
|
|
if isinstance(val, float) and pd.isna(val):
|
|
return ""
|
|
if isinstance(val, (pd.Timestamp, dt.date, dt.datetime)):
|
|
return str(val)
|
|
return repr(val) if isinstance(val, str) else str(val)
|
|
|
|
|
|
def _format_top_values(pairs: List[Tuple[Any, int]]) -> str:
|
|
if not pairs:
|
|
return ""
|
|
return ", ".join(f"{_format_value(v)} ({c:,})" for v, c in pairs)
|
|
|
|
|
|
def _format_samples(samples: List[Any]) -> str:
|
|
if not samples:
|
|
return "(all null)"
|
|
return ", ".join(_format_value(v) for v in samples)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming profile
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def profile_file(
|
|
path: Path,
|
|
*,
|
|
chunksize: Optional[int] = None,
|
|
delimiter: Optional[str] = None,
|
|
encoding: str = "utf-8",
|
|
quotechar: str = '"',
|
|
) -> Tuple[Dict[str, _ColumnStats], Dict[str, ColumnSpec], Any, int, bool]:
|
|
"""Stream ``path`` once, returning *(stats, columns, meta, total_rows, is_text)*.
|
|
|
|
``columns`` is the loader's inferred schema from the first
|
|
``PREVIEW_ROWS_FOR_INFERENCE`` rows - identical to what ``load_sas``
|
|
would use. ``stats`` are the full-file observations we add on top.
|
|
|
|
``is_text`` is True when the file was read as a delimited text file
|
|
rather than a SAS file. Callers can use this to adjust display (e.g.
|
|
showing "N/A" for SAS-specific metadata fields).
|
|
|
|
For text files (``.csv``, ``.tsv``, ``.txt``), the ``delimiter``,
|
|
``encoding``, and ``quotechar`` parameters control parsing. If
|
|
``delimiter`` is ``None``, ``.tsv`` files default to tab and all
|
|
others default to comma.
|
|
"""
|
|
is_text = _is_text_file(path)
|
|
effective_delimiter = _auto_delimiter(path, delimiter)
|
|
|
|
# Build kwargs for the loader's text-aware read functions.
|
|
text_kwargs: Dict[str, Any] = {
|
|
"delimiter": effective_delimiter,
|
|
"text_encoding": encoding,
|
|
"quotechar": quotechar,
|
|
}
|
|
|
|
preview_df, meta = read_sas_preview(
|
|
path, rows=PREVIEW_ROWS_FOR_INFERENCE, **text_kwargs,
|
|
)
|
|
total_rows_hint = getattr(meta, "number_rows", None)
|
|
columns = infer_schema(preview_df, meta, total_rows=total_rows_hint)
|
|
|
|
stats: Dict[str, _ColumnStats] = {
|
|
name: _ColumnStats(name=name) for name in columns
|
|
}
|
|
|
|
total_rows = 0
|
|
effective_chunksize = chunksize if chunksize is not None else PROFILE_CHUNK_ROWS
|
|
chunk_kwargs: Dict[str, Any] = {"chunksize": effective_chunksize}
|
|
chunk_kwargs.update(text_kwargs)
|
|
# pyreadstat + pandas are both C-level; the per-chunk overhead we pay
|
|
# is dominated by the value_counts passes in _ColumnStats.update, so
|
|
# the profile runs O(total_rows) with a small constant.
|
|
import time
|
|
started_at = time.monotonic()
|
|
last_print_at = started_at
|
|
for chunk_df, _chunk_meta in iter_sas_chunks(path, **chunk_kwargs):
|
|
total_rows += len(chunk_df)
|
|
for name, cs in stats.items():
|
|
if name not in chunk_df.columns:
|
|
continue
|
|
cs.update(chunk_df[name])
|
|
now = time.monotonic()
|
|
# Throttle progress output to ~one line per 2 seconds so huge files
|
|
# don't spam stderr but small files still print at least once.
|
|
if now - last_print_at >= 2.0:
|
|
elapsed = now - started_at
|
|
rate = total_rows / elapsed if elapsed > 0 else 0.0
|
|
print(
|
|
f" profiling... {total_rows:,} rows "
|
|
f"({rate:,.0f} rows/s)",
|
|
file=sys.stderr,
|
|
)
|
|
last_print_at = now
|
|
|
|
elapsed = time.monotonic() - started_at
|
|
rate = total_rows / elapsed if elapsed > 0 else 0.0
|
|
print(
|
|
f" profiled {total_rows:,} rows in {elapsed:.1f}s "
|
|
f"({rate:,.0f} rows/s)",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
return stats, columns, meta, total_rows, is_text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Classifiers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class _DropCandidate:
|
|
name: str
|
|
reason: str
|
|
|
|
|
|
@dataclass
|
|
class _PartitionCandidate:
|
|
name: str
|
|
kind: str # "observed" or "pre_sharded"
|
|
distinct_count: int
|
|
fill_pct: float
|
|
top_values: str
|
|
observed_values_in_file: str
|
|
note: str
|
|
score: float
|
|
|
|
|
|
@dataclass
|
|
class _IndexCandidate:
|
|
name: str
|
|
uniqueness_pct: float
|
|
distinct_count: int
|
|
fill_pct: float
|
|
name_bonus: bool
|
|
note: str
|
|
score: float
|
|
|
|
|
|
@dataclass
|
|
class _TypeWarning:
|
|
name: str
|
|
severity: str # "info" | "warn" | "error"
|
|
message: str
|
|
|
|
|
|
def _is_constant_like(cs: _ColumnStats) -> bool:
|
|
"""True when the column is effectively a single value (possibly with
|
|
a handful of nulls / empties mixed in)."""
|
|
if cs.n_non_null == 0:
|
|
return False
|
|
return cs.distinct_count == 1 and not cs.distinct_overflow
|
|
|
|
|
|
def classify(
|
|
stats: Dict[str, _ColumnStats],
|
|
columns: Dict[str, ColumnSpec],
|
|
*,
|
|
high_null_pct: float,
|
|
index_uniqueness_pct: float,
|
|
partition_min_fill_pct: float,
|
|
pre_sharded_max_distinct: int,
|
|
) -> Tuple[
|
|
List[_DropCandidate],
|
|
List[_PartitionCandidate],
|
|
List[_IndexCandidate],
|
|
List[_TypeWarning],
|
|
]:
|
|
"""Turn per-column stats + the loader's schema into four ranked lists.
|
|
|
|
Partition candidates are restricted to columns whose name matches
|
|
:data:`PARTITION_NAME_PATTERNS` - in practice STATE / STATE_CODE. A
|
|
generic "low-cardinality = partition candidate" heuristic produces too
|
|
much noise for this codebase, so we only surface columns we're confident
|
|
about by name.
|
|
"""
|
|
|
|
drops: List[_DropCandidate] = []
|
|
partitions: List[_PartitionCandidate] = []
|
|
indexes: List[_IndexCandidate] = []
|
|
warnings: List[_TypeWarning] = []
|
|
|
|
# -- Partition candidates (name-matched only) --------------------------
|
|
# Run this before the drop check so pre-sharded STATE columns don't get
|
|
# silently dropped for being "constant".
|
|
claimed_by_partition: set = set()
|
|
for name, cs in stats.items():
|
|
if not _matches_any(PARTITION_NAME_PATTERNS, name):
|
|
continue
|
|
if cs.n_total == 0 or cs.n_non_null == 0:
|
|
continue
|
|
if cs.fill_pct < partition_min_fill_pct:
|
|
continue
|
|
|
|
is_pre_sharded = (
|
|
not cs.distinct_overflow
|
|
and cs.distinct_count <= pre_sharded_max_distinct
|
|
)
|
|
kind = "pre_sharded" if is_pre_sharded else "observed"
|
|
observed = _format_top_values(cs.top_values(pre_sharded_max_distinct))
|
|
|
|
if is_pre_sharded:
|
|
note = (
|
|
f"pre-sharded: this file only contains {cs.distinct_count} "
|
|
f"distinct value(s) ({observed}); keep the column and set "
|
|
"partition_by at the load_folder level so sibling files merge "
|
|
"into separate partitions of one table"
|
|
)
|
|
else:
|
|
note = (
|
|
f"observed {cs.distinct_display} distinct value(s) across "
|
|
f"{cs.fill_pct:.1f}% of rows; LIST partitioning will create "
|
|
"one child table per distinct value"
|
|
)
|
|
|
|
partitions.append(
|
|
_PartitionCandidate(
|
|
name=name,
|
|
kind=kind,
|
|
distinct_count=cs.distinct_count,
|
|
fill_pct=cs.fill_pct,
|
|
top_values=_format_top_values(cs.top_values()),
|
|
observed_values_in_file=observed,
|
|
note=note,
|
|
# Pre-sharded beats observed as the snippet's top pick.
|
|
score=(1_000_000.0 if is_pre_sharded else 500_000.0) + cs.fill_pct,
|
|
)
|
|
)
|
|
claimed_by_partition.add(name)
|
|
|
|
partitions.sort(key=lambda p: p.score, reverse=True)
|
|
|
|
# -- Drop candidates ---------------------------------------------------
|
|
for name, cs in stats.items():
|
|
if name in claimed_by_partition:
|
|
continue
|
|
if cs.n_total == 0:
|
|
continue
|
|
|
|
reason: Optional[str] = None
|
|
if cs.n_null == cs.n_total:
|
|
reason = "all-null"
|
|
elif (
|
|
cs.n_non_null > 0
|
|
and cs.distinct_count == 0
|
|
and not cs.distinct_overflow
|
|
):
|
|
# Non-null but nothing hashable captured - treat as opaque.
|
|
reason = "all-empty / unhashable"
|
|
elif cs.n_non_null == cs.n_empty_str and cs.n_empty_str > 0:
|
|
reason = "all-empty"
|
|
elif _is_constant_like(cs):
|
|
only_val = next(iter(cs.distinct))
|
|
reason = f"constant={_format_value(only_val)}"
|
|
elif cs.null_pct >= high_null_pct:
|
|
reason = f"null_pct={cs.null_pct:.1f}%"
|
|
|
|
if reason is not None:
|
|
drops.append(_DropCandidate(name=name, reason=reason))
|
|
|
|
dropped_names = {d.name for d in drops}
|
|
partition_names = {p.name for p in partitions}
|
|
|
|
# -- Index candidates --------------------------------------------------
|
|
for name, cs in stats.items():
|
|
if name in dropped_names or name in partition_names:
|
|
continue
|
|
spec = columns.get(name)
|
|
if spec is None:
|
|
continue
|
|
if cs.n_non_null == 0:
|
|
continue
|
|
if cs.distinct_overflow:
|
|
# Super-high-cardinality → perfect candidate for an index.
|
|
uniqueness = 100.0
|
|
distinct_count = DISTINCT_CAP # display sentinel
|
|
else:
|
|
uniqueness = 100.0 * cs.distinct_count / cs.n_non_null
|
|
distinct_count = cs.distinct_count
|
|
if uniqueness < index_uniqueness_pct:
|
|
continue
|
|
|
|
name_bonus = _matches_any(INDEX_NAME_PATTERNS, name)
|
|
notes: List[str] = []
|
|
if name_bonus:
|
|
notes.append("name matches INDEX_NAME_PATTERNS (ID/KEY-ish)")
|
|
if cs.distinct_overflow:
|
|
notes.append(
|
|
f"distinct tracking capped at {DISTINCT_CAP:,}; "
|
|
"treating as high-cardinality"
|
|
)
|
|
|
|
# Rank: name match dominates, then raw uniqueness, then fill.
|
|
score = (500_000.0 if name_bonus else 0.0) + uniqueness + cs.fill_pct / 100.0
|
|
|
|
indexes.append(
|
|
_IndexCandidate(
|
|
name=name,
|
|
uniqueness_pct=uniqueness,
|
|
distinct_count=distinct_count,
|
|
fill_pct=cs.fill_pct,
|
|
name_bonus=name_bonus,
|
|
note="; ".join(notes),
|
|
score=score,
|
|
)
|
|
)
|
|
|
|
indexes.sort(key=lambda i: i.score, reverse=True)
|
|
|
|
# -- Type warnings -----------------------------------------------------
|
|
for name, cs in stats.items():
|
|
spec = columns.get(name)
|
|
if spec is None:
|
|
continue
|
|
|
|
# Re-surface whatever the loader's own inference already flagged in
|
|
# notes - these are genuinely useful for the user to see without
|
|
# having to dry-run the loader.
|
|
for note in spec.notes:
|
|
warnings.append(
|
|
_TypeWarning(name=name, severity="info", message=note)
|
|
)
|
|
if spec.sampled:
|
|
warnings.append(
|
|
_TypeWarning(
|
|
name=name,
|
|
severity="info",
|
|
message=(
|
|
"loader inferred type from a bounded preview; "
|
|
"sampled=True"
|
|
),
|
|
)
|
|
)
|
|
|
|
pg_type = spec.postgres_type.upper()
|
|
|
|
# Preview said NOT NULL but the full file has nulls - loader would
|
|
# have emitted NOT NULL and then choked on COPY.
|
|
if not spec.nullable and cs.n_null > 0:
|
|
warnings.append(
|
|
_TypeWarning(
|
|
name=name,
|
|
severity="error",
|
|
message=(
|
|
f"preview saw zero nulls (NOT NULL) but full file has "
|
|
f"{cs.n_null:,} null(s); COPY would fail under the "
|
|
"loader's inferred NOT NULL"
|
|
),
|
|
)
|
|
)
|
|
|
|
# INTEGER range check against the full-file observed min/max.
|
|
if pg_type == "INTEGER" and cs.numeric_count > 0:
|
|
lo, hi = NUMERIC_INT_RANGE
|
|
vmin = cs.min_val if cs.min_val is not None else 0
|
|
vmax = cs.max_val if cs.max_val is not None else 0
|
|
try:
|
|
if vmin < lo or vmax > hi:
|
|
warnings.append(
|
|
_TypeWarning(
|
|
name=name,
|
|
severity="error",
|
|
message=(
|
|
f"loader inferred INTEGER from the preview but "
|
|
f"full-file range [{vmin}, {vmax}] overflows "
|
|
f"int4 {NUMERIC_INT_RANGE}; BIGINT required"
|
|
),
|
|
)
|
|
)
|
|
except TypeError:
|
|
pass
|
|
|
|
# Preview said all-null (loader defaults to TEXT) but data exists.
|
|
was_all_null_preview = any(
|
|
"all-null column" in n for n in spec.notes
|
|
)
|
|
if was_all_null_preview and cs.n_non_null > 0:
|
|
warnings.append(
|
|
_TypeWarning(
|
|
name=name,
|
|
severity="warn",
|
|
message=(
|
|
"preview was all-null so loader defaulted to TEXT, "
|
|
f"but full file has {cs.n_non_null:,} non-null "
|
|
"value(s); consider a tighter include/exclude or "
|
|
"re-inferring with TYPE_INFERENCE_SAMPLE_ROWS=None"
|
|
),
|
|
)
|
|
)
|
|
|
|
return drops, partitions, indexes, warnings
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# YAML snippet
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def render_yaml_snippet(
|
|
drops: List[_DropCandidate],
|
|
partitions: List[_PartitionCandidate],
|
|
indexes: List[_IndexCandidate],
|
|
) -> str:
|
|
"""Produce a paste-ready YAML snippet for the loader config."""
|
|
lines: List[str] = ["# Suggested additions to your load_sas.py / load_folder.py config"]
|
|
|
|
if drops:
|
|
lines.append("exclude:")
|
|
for d in drops:
|
|
lines.append(f" - {d.name} # {d.reason}")
|
|
else:
|
|
lines.append("# (no drop candidates found)")
|
|
|
|
lines.append("")
|
|
|
|
if partitions:
|
|
top = partitions[0]
|
|
if top.kind == "pre_sharded":
|
|
lines.append(
|
|
f"# !! PRE-SHARDED: this file only contains "
|
|
f"{top.name} = {top.observed_values_in_file}."
|
|
)
|
|
lines.append(
|
|
"# !! Keep the column in the schema and set partition_by at the "
|
|
"load_folder level"
|
|
)
|
|
lines.append(
|
|
"# !! so sibling files merge into one table under separate "
|
|
"partitions."
|
|
)
|
|
lines.append("partition_by:")
|
|
lines.append(f" - {top.name}")
|
|
else:
|
|
lines.append(
|
|
"# (no partition candidates found - no column matched "
|
|
"PARTITION_NAME_PATTERNS)"
|
|
)
|
|
|
|
lines.append("")
|
|
|
|
if indexes:
|
|
lines.append("indexes:")
|
|
for i in indexes:
|
|
bonus = " (name match)" if i.name_bonus else ""
|
|
lines.append(
|
|
f" - {i.name} # uniqueness={i.uniqueness_pct:.1f}%{bonus}"
|
|
)
|
|
else:
|
|
lines.append("# (no index candidates found)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# XLSX writer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_HEADER_FONT = Font(bold=True, color="FFFFFF")
|
|
_HEADER_FILL = PatternFill("solid", fgColor="305496")
|
|
_WARN_FILL = PatternFill("solid", fgColor="FFE699")
|
|
_ERROR_FILL = PatternFill("solid", fgColor="F4B183")
|
|
|
|
|
|
def _write_header(ws, headers: List[str]) -> None:
|
|
for col_idx, label in enumerate(headers, start=1):
|
|
cell = ws.cell(row=1, column=col_idx, value=label)
|
|
cell.font = _HEADER_FONT
|
|
cell.fill = _HEADER_FILL
|
|
cell.alignment = Alignment(vertical="center")
|
|
ws.freeze_panes = "A2"
|
|
|
|
|
|
def _autosize(ws, *, max_width: int = 60) -> None:
|
|
for col_cells in ws.columns:
|
|
letter = get_column_letter(col_cells[0].column)
|
|
longest = 0
|
|
for cell in col_cells:
|
|
if cell.value is None:
|
|
continue
|
|
text = str(cell.value)
|
|
# Only measure the first line so a long YAML cell doesn't push
|
|
# everything else ultra-wide.
|
|
longest = max(longest, min(len(text.split("\n", 1)[0]), max_width))
|
|
ws.column_dimensions[letter].width = min(max(longest + 2, 10), max_width)
|
|
|
|
|
|
def _write_overview(
|
|
ws,
|
|
*,
|
|
path: Path,
|
|
size_bytes: int,
|
|
total_rows: int,
|
|
total_cols: int,
|
|
thresholds: Dict[str, Any],
|
|
) -> None:
|
|
ws.cell(row=1, column=1, value="Field").font = _HEADER_FONT
|
|
ws.cell(row=1, column=1).fill = _HEADER_FILL
|
|
ws.cell(row=1, column=2, value="Value").font = _HEADER_FONT
|
|
ws.cell(row=1, column=2).fill = _HEADER_FILL
|
|
ws.freeze_panes = "A2"
|
|
|
|
rows = [
|
|
("File path", str(path)),
|
|
("File size", _format_size(size_bytes)),
|
|
("Extension", path.suffix.lower()),
|
|
("Total rows", f"{total_rows:,}"),
|
|
("Total columns", f"{total_cols:,}"),
|
|
("Generated at", dt.datetime.now().isoformat(timespec="seconds")),
|
|
]
|
|
for k, v in thresholds.items():
|
|
rows.append((f"threshold: {k}", str(v)))
|
|
|
|
for i, (k, v) in enumerate(rows, start=2):
|
|
ws.cell(row=i, column=1, value=k)
|
|
ws.cell(row=i, column=2, value=v)
|
|
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_columns(
|
|
ws,
|
|
stats: Dict[str, _ColumnStats],
|
|
columns: Dict[str, ColumnSpec],
|
|
*,
|
|
is_text: bool = False,
|
|
) -> None:
|
|
headers = [
|
|
"column", "sas_format", "source_dtype", "inferred_postgres_type",
|
|
"nullable", "n_total", "n_null", "null_pct", "distinct_count",
|
|
"min", "max", "mean", "std",
|
|
"top_value", "top_count",
|
|
"max_str_bytes", "mean_str_bytes",
|
|
"sample_values", "notes",
|
|
]
|
|
_write_header(ws, headers)
|
|
|
|
for row_idx, (name, cs) in enumerate(stats.items(), start=2):
|
|
spec = columns.get(name)
|
|
top_val, top_count = cs.top_value
|
|
mean_bytes = (cs.str_sum_bytes / cs.str_count) if cs.str_count else None
|
|
# For text files, SAS-specific metadata is not available.
|
|
if is_text:
|
|
sas_format_display = "N/A"
|
|
else:
|
|
sas_format_display = spec.sas_format if spec else ""
|
|
values = [
|
|
name,
|
|
sas_format_display,
|
|
spec.source_dtype if spec else "",
|
|
spec.postgres_type if spec else "",
|
|
"YES" if (spec and spec.nullable) else "NO",
|
|
cs.n_total,
|
|
cs.n_null,
|
|
round(cs.null_pct, 3),
|
|
cs.distinct_display,
|
|
_format_value(cs.min_val),
|
|
_format_value(cs.max_val),
|
|
round(cs.mean, 6) if cs.mean is not None else "",
|
|
round(cs.std, 6) if cs.std is not None else "",
|
|
_format_value(top_val),
|
|
top_count or "",
|
|
cs.str_max_bytes or "",
|
|
round(mean_bytes, 2) if mean_bytes is not None else "",
|
|
_format_samples(cs.samples),
|
|
"; ".join(spec.notes) if spec and spec.notes else "",
|
|
]
|
|
for col_idx, v in enumerate(values, start=1):
|
|
ws.cell(row=row_idx, column=col_idx, value=v)
|
|
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_drop(ws, drops: List[_DropCandidate]) -> None:
|
|
headers = ["column", "reason"]
|
|
_write_header(ws, headers)
|
|
if not drops:
|
|
ws.cell(row=2, column=1, value="(no drop candidates)")
|
|
for i, d in enumerate(drops, start=2):
|
|
ws.cell(row=i, column=1, value=d.name)
|
|
ws.cell(row=i, column=2, value=d.reason)
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_partition(ws, partitions: List[_PartitionCandidate]) -> None:
|
|
headers = [
|
|
"rank", "column", "kind", "distinct_count", "fill_pct",
|
|
"observed_values_in_file", "top_values", "score", "note",
|
|
]
|
|
_write_header(ws, headers)
|
|
if not partitions:
|
|
ws.cell(row=2, column=1, value="(no partition candidates)")
|
|
for rank, p in enumerate(partitions, start=1):
|
|
row = rank + 1
|
|
ws.cell(row=row, column=1, value=rank)
|
|
ws.cell(row=row, column=2, value=p.name)
|
|
ws.cell(row=row, column=3, value=p.kind)
|
|
ws.cell(row=row, column=4, value=p.distinct_count)
|
|
ws.cell(row=row, column=5, value=round(p.fill_pct, 3))
|
|
ws.cell(row=row, column=6, value=p.observed_values_in_file)
|
|
ws.cell(row=row, column=7, value=p.top_values)
|
|
ws.cell(row=row, column=8, value=round(p.score, 3))
|
|
ws.cell(row=row, column=9, value=p.note)
|
|
if p.kind == "pre_sharded":
|
|
for col in range(1, len(headers) + 1):
|
|
ws.cell(row=row, column=col).fill = _WARN_FILL
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_index(ws, indexes: List[_IndexCandidate]) -> None:
|
|
headers = [
|
|
"rank", "column", "uniqueness_pct", "distinct_count", "fill_pct",
|
|
"name_bonus", "score", "note",
|
|
]
|
|
_write_header(ws, headers)
|
|
if not indexes:
|
|
ws.cell(row=2, column=1, value="(no index candidates)")
|
|
for rank, i in enumerate(indexes, start=1):
|
|
row = rank + 1
|
|
ws.cell(row=row, column=1, value=rank)
|
|
ws.cell(row=row, column=2, value=i.name)
|
|
ws.cell(row=row, column=3, value=round(i.uniqueness_pct, 3))
|
|
ws.cell(row=row, column=4, value=i.distinct_count)
|
|
ws.cell(row=row, column=5, value=round(i.fill_pct, 3))
|
|
ws.cell(row=row, column=6, value="YES" if i.name_bonus else "NO")
|
|
ws.cell(row=row, column=7, value=round(i.score, 3))
|
|
ws.cell(row=row, column=8, value=i.note)
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_warnings(ws, warnings: List[_TypeWarning]) -> None:
|
|
headers = ["column", "severity", "message"]
|
|
_write_header(ws, headers)
|
|
if not warnings:
|
|
ws.cell(row=2, column=1, value="(no type warnings)")
|
|
for i, w in enumerate(warnings, start=2):
|
|
ws.cell(row=i, column=1, value=w.name)
|
|
ws.cell(row=i, column=2, value=w.severity)
|
|
ws.cell(row=i, column=3, value=w.message)
|
|
fill = None
|
|
if w.severity == "error":
|
|
fill = _ERROR_FILL
|
|
elif w.severity == "warn":
|
|
fill = _WARN_FILL
|
|
if fill is not None:
|
|
for col in range(1, len(headers) + 1):
|
|
ws.cell(row=i, column=col).fill = fill
|
|
_autosize(ws)
|
|
|
|
|
|
def _write_yaml_sheet(ws, snippet: str) -> None:
|
|
ws.cell(row=1, column=1, value="YAML suggestion (paste into your loader config)").font = _HEADER_FONT
|
|
ws.cell(row=1, column=1).fill = _HEADER_FILL
|
|
cell = ws.cell(row=2, column=1, value=snippet)
|
|
cell.alignment = Alignment(wrap_text=True, vertical="top")
|
|
# Pick a comfy width for YAML; row height is auto when wrap_text is on.
|
|
ws.column_dimensions["A"].width = 100
|
|
|
|
|
|
def write_report(
|
|
out_path: Path,
|
|
*,
|
|
path: Path,
|
|
size_bytes: int,
|
|
total_rows: int,
|
|
stats: Dict[str, _ColumnStats],
|
|
columns: Dict[str, ColumnSpec],
|
|
drops: List[_DropCandidate],
|
|
partitions: List[_PartitionCandidate],
|
|
indexes: List[_IndexCandidate],
|
|
warnings: List[_TypeWarning],
|
|
yaml_snippet: str,
|
|
thresholds: Dict[str, Any],
|
|
is_text: bool = False,
|
|
) -> None:
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "Overview"
|
|
_write_overview(
|
|
ws,
|
|
path=path,
|
|
size_bytes=size_bytes,
|
|
total_rows=total_rows,
|
|
total_cols=len(columns),
|
|
thresholds=thresholds,
|
|
)
|
|
_write_columns(wb.create_sheet("Columns"), stats, columns, is_text=is_text)
|
|
_write_drop(wb.create_sheet("Drop candidates"), drops)
|
|
_write_partition(wb.create_sheet("Partition candidates"), partitions)
|
|
_write_index(wb.create_sheet("Index candidates"), indexes)
|
|
_write_warnings(wb.create_sheet("Type warnings"), warnings)
|
|
_write_yaml_sheet(wb.create_sheet("YAML suggestion"), yaml_snippet)
|
|
wb.save(out_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_argparser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(
|
|
description=(
|
|
"Profile a local SAS or delimited text file and write an Excel "
|
|
"report with drop, partition_by, and index suggestions for "
|
|
"generic_loader/load_sas.py and load_folder.py.\n\n"
|
|
"Supported formats: .sas7bdat, .xpt, .xport (SAS); "
|
|
".csv, .tsv, .txt (delimited text with headers)."
|
|
),
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
p.add_argument(
|
|
"--file", type=Path, default=Path(SAS_PATH),
|
|
help=(
|
|
"Path to the file to profile. Accepts SAS files "
|
|
"(.sas7bdat/.xpt/.xport) and delimited text files "
|
|
f"(.csv/.tsv/.txt). Default: {SAS_PATH!r}."
|
|
),
|
|
)
|
|
p.add_argument("--out", type=Path, default=Path(OUTPUT_XLSX),
|
|
help=f"Where to write the .xlsx report (default: {OUTPUT_XLSX!r}).")
|
|
p.add_argument("--high-null-pct", type=float, default=HIGH_NULL_PCT,
|
|
help="Null percentage at/above which a column is a drop candidate.")
|
|
p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT,
|
|
help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.")
|
|
p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT)
|
|
p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT)
|
|
p.add_argument(
|
|
"--chunksize", type=int, default=None,
|
|
help=(
|
|
"Rows per streaming read. Bigger chunks amortize pyreadstat / "
|
|
"pandas overhead (faster for huge files) but use more peak "
|
|
f"memory. Defaults to PROFILE_CHUNK_ROWS ({PROFILE_CHUNK_ROWS:,})."
|
|
),
|
|
)
|
|
# -- Text file options -------------------------------------------------
|
|
p.add_argument(
|
|
"--delimiter", type=str, default=None,
|
|
help=(
|
|
"Column delimiter for text files (.csv/.tsv/.txt). "
|
|
"Defaults to tab for .tsv, comma for .csv/.txt. "
|
|
"Ignored for SAS files. Example: --delimiter '|'"
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--encoding", type=str, default="utf-8",
|
|
help=(
|
|
"Character encoding for text files (default: utf-8). "
|
|
"Ignored for SAS files."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--quotechar", type=str, default='"',
|
|
help=(
|
|
'Quote character for text files (default: \'"\'). '
|
|
"Ignored for SAS files."
|
|
),
|
|
)
|
|
return p
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
args = _build_argparser().parse_args(argv)
|
|
|
|
path: Path = args.file
|
|
out_path: Path = args.out
|
|
|
|
if not path.exists():
|
|
print(f"error: file not found: {path}", file=sys.stderr)
|
|
return 2
|
|
|
|
if not _is_supported_file(path):
|
|
exts = ", ".join(sorted(SUPPORTED_EXTENSIONS))
|
|
print(
|
|
f"error: unsupported extension {path.suffix!r}; "
|
|
f"expected one of: {exts}",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
file_kind = "text" if _is_text_file(path) else "SAS"
|
|
print(f"profiling {path} ({file_kind}) -> {out_path}", file=sys.stderr)
|
|
stats, columns, meta, total_rows, is_text = profile_file(
|
|
path,
|
|
chunksize=args.chunksize,
|
|
delimiter=args.delimiter,
|
|
encoding=args.encoding,
|
|
quotechar=args.quotechar,
|
|
)
|
|
|
|
drops, partitions, indexes, warnings = classify(
|
|
stats, columns,
|
|
high_null_pct=args.high_null_pct,
|
|
index_uniqueness_pct=args.index_uniqueness_pct,
|
|
partition_min_fill_pct=args.partition_min_fill_pct,
|
|
pre_sharded_max_distinct=args.pre_sharded_max_distinct,
|
|
)
|
|
|
|
yaml_snippet = render_yaml_snippet(drops, partitions, indexes)
|
|
|
|
thresholds = {
|
|
"HIGH_NULL_PCT": args.high_null_pct,
|
|
"INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct,
|
|
"PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct,
|
|
"PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct,
|
|
"PARTITION_NAME_PATTERNS": ", ".join(p.pattern for p in PARTITION_NAME_PATTERNS),
|
|
"DISTINCT_CAP": DISTINCT_CAP,
|
|
"TOP_N_VALUES": TOP_N_VALUES,
|
|
"PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,
|
|
}
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
write_report(
|
|
out_path,
|
|
path=path,
|
|
size_bytes=os.path.getsize(path),
|
|
total_rows=total_rows,
|
|
stats=stats,
|
|
columns=columns,
|
|
drops=drops,
|
|
partitions=partitions,
|
|
indexes=indexes,
|
|
warnings=warnings,
|
|
yaml_snippet=yaml_snippet,
|
|
thresholds=thresholds,
|
|
is_text=is_text,
|
|
)
|
|
|
|
print(
|
|
f"wrote {out_path} ({len(stats)} columns, {total_rows:,} rows scanned)\n"
|
|
f" drops: {len(drops)}\n"
|
|
f" partitions: {len(partitions)}\n"
|
|
f" indexes: {len(indexes)}\n"
|
|
f" warnings: {len(warnings)}",
|
|
file=sys.stderr,
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|