Refactor partition candidate logic in sas_profiler.py
Updated the partition candidate selection process to restrict candidates to columns matching specific name patterns, improving accuracy and reducing noise. Removed outdated distinct value constraints and clarified documentation for partitioning behavior. Enhanced handling of pre-sharded columns and refined the classification logic for better performance.
This commit is contained in:
parent
b3b968edf2
commit
5449a25b44
@ -76,21 +76,14 @@ INDEX_UNIQUENESS_PCT: float = 95.0
|
|||||||
"""Columns whose distinct/non-null ratio meets or exceeds this threshold are
|
"""Columns whose distinct/non-null ratio meets or exceeds this threshold are
|
||||||
flagged as index candidates."""
|
flagged as index candidates."""
|
||||||
|
|
||||||
PARTITION_MIN_DISTINCT: int = 2
|
|
||||||
"""A partition candidate must have at least this many distinct values."""
|
|
||||||
|
|
||||||
PARTITION_MAX_DISTINCT: int = 500
|
|
||||||
"""A partition candidate must have at most this many distinct values. Kept
|
|
||||||
deliberately tighter than the loader's max_partitions default (10,000) so
|
|
||||||
the default suggestions stay conservative."""
|
|
||||||
|
|
||||||
PARTITION_MIN_FILL_PCT: float = 95.0
|
PARTITION_MIN_FILL_PCT: float = 95.0
|
||||||
"""Partition candidates must be non-null in at least this fraction of rows."""
|
"""Name-matched partition candidates must be non-null in at least this
|
||||||
|
fraction of rows."""
|
||||||
|
|
||||||
PRE_SHARDED_MAX_DISTINCT: int = 3
|
PRE_SHARDED_MAX_DISTINCT: int = 3
|
||||||
"""A name-matched column with <= this many distinct values is treated as
|
"""A name-matched column with <= this many distinct values is treated as
|
||||||
"the file is probably pre-sharded on this column" rather than being
|
pre-sharded ("this file is one slice; sibling files have the other values")
|
||||||
silently dumped into the drop list."""
|
rather than as a ready-to-partition observed column."""
|
||||||
|
|
||||||
DISTINCT_CAP: int = 10_000
|
DISTINCT_CAP: int = 10_000
|
||||||
"""Max size of the per-column distinct-value set. Exceeding this marks the
|
"""Max size of the per-column distinct-value set. Exceeding this marks the
|
||||||
@ -108,10 +101,11 @@ PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
|||||||
re.compile(r"^state$", re.IGNORECASE),
|
re.compile(r"^state$", re.IGNORECASE),
|
||||||
re.compile(r"^state_?code$", re.IGNORECASE),
|
re.compile(r"^state_?code$", re.IGNORECASE),
|
||||||
)
|
)
|
||||||
"""Column names that are "probably partition columns" regardless of how
|
"""Only columns whose name matches one of these patterns are ever considered
|
||||||
many distinct values happen to be present in this one file. Kept tiny on
|
partition candidates. This deliberately ignores generic low-cardinality
|
||||||
purpose - add more patterns here later if you want to recognise
|
signals (status flags, boolean columns, etc.) because in practice the only
|
||||||
region/year/etc."""
|
useful partition key in this codebase is STATE. Add more patterns here if
|
||||||
|
that ever stops being true."""
|
||||||
|
|
||||||
|
|
||||||
INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
||||||
@ -123,12 +117,6 @@ INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = (
|
|||||||
"""Name-bonus patterns for index-candidate ranking."""
|
"""Name-bonus patterns for index-candidate ranking."""
|
||||||
|
|
||||||
|
|
||||||
_PARTITION_FRIENDLY_TYPES: frozenset = frozenset(
|
|
||||||
{"TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER",
|
|
||||||
"INTEGER", "BIGINT", "SMALLINT", "BOOLEAN", "DATE"}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Per-column streaming aggregator
|
# Per-column streaming aggregator
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -447,8 +435,6 @@ def classify(
|
|||||||
*,
|
*,
|
||||||
high_null_pct: float,
|
high_null_pct: float,
|
||||||
index_uniqueness_pct: float,
|
index_uniqueness_pct: float,
|
||||||
partition_min_distinct: int,
|
|
||||||
partition_max_distinct: int,
|
|
||||||
partition_min_fill_pct: float,
|
partition_min_fill_pct: float,
|
||||||
pre_sharded_max_distinct: int,
|
pre_sharded_max_distinct: int,
|
||||||
) -> Tuple[
|
) -> Tuple[
|
||||||
@ -457,60 +443,69 @@ def classify(
|
|||||||
List[_IndexCandidate],
|
List[_IndexCandidate],
|
||||||
List[_TypeWarning],
|
List[_TypeWarning],
|
||||||
]:
|
]:
|
||||||
"""Turn per-column stats + the loader's schema into four ranked lists."""
|
"""Turn per-column stats + the loader's schema into four ranked lists.
|
||||||
|
|
||||||
|
Partition candidates are restricted to columns whose name matches
|
||||||
|
:data:`PARTITION_NAME_PATTERNS` - in practice STATE / STATE_CODE. A
|
||||||
|
generic "low-cardinality = partition candidate" heuristic produces too
|
||||||
|
much noise for this codebase, so we only surface columns we're confident
|
||||||
|
about by name.
|
||||||
|
"""
|
||||||
|
|
||||||
drops: List[_DropCandidate] = []
|
drops: List[_DropCandidate] = []
|
||||||
partitions: List[_PartitionCandidate] = []
|
partitions: List[_PartitionCandidate] = []
|
||||||
indexes: List[_IndexCandidate] = []
|
indexes: List[_IndexCandidate] = []
|
||||||
warnings: List[_TypeWarning] = []
|
warnings: List[_TypeWarning] = []
|
||||||
|
|
||||||
# Names we've already routed into the partition lane - exclude them from
|
# -- Partition candidates (name-matched only) --------------------------
|
||||||
# the drop / index lanes downstream.
|
|
||||||
claimed_by_partition: set = set()
|
|
||||||
|
|
||||||
# -- First pass: partition-name-matched columns ------------------------
|
|
||||||
# Run this before the drop check so pre-sharded STATE columns don't get
|
# Run this before the drop check so pre-sharded STATE columns don't get
|
||||||
# silently dropped.
|
# silently dropped for being "constant".
|
||||||
|
claimed_by_partition: set = set()
|
||||||
for name, cs in stats.items():
|
for name, cs in stats.items():
|
||||||
spec = columns.get(name)
|
|
||||||
if not _matches_any(PARTITION_NAME_PATTERNS, name):
|
if not _matches_any(PARTITION_NAME_PATTERNS, name):
|
||||||
continue
|
continue
|
||||||
if cs.n_total == 0:
|
if cs.n_total == 0 or cs.n_non_null == 0:
|
||||||
|
continue
|
||||||
|
if cs.fill_pct < partition_min_fill_pct:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
looks_pre_sharded = (
|
is_pre_sharded = (
|
||||||
cs.n_non_null > 0
|
not cs.distinct_overflow
|
||||||
and not cs.distinct_overflow
|
|
||||||
and cs.distinct_count <= pre_sharded_max_distinct
|
and cs.distinct_count <= pre_sharded_max_distinct
|
||||||
and cs.fill_pct >= partition_min_fill_pct
|
|
||||||
)
|
)
|
||||||
if looks_pre_sharded:
|
kind = "pre_sharded" if is_pre_sharded else "observed"
|
||||||
observed = ", ".join(_format_value(v) for v, _ in cs.top_values(pre_sharded_max_distinct))
|
observed = _format_top_values(cs.top_values(pre_sharded_max_distinct))
|
||||||
note_parts = [
|
|
||||||
f"pre-sharded: this file only contains {cs.distinct_count} distinct "
|
|
||||||
f"value(s) ({observed})",
|
|
||||||
"keep the column and set partition_by at the load_folder level so "
|
|
||||||
"sibling files merge into separate partitions of one table",
|
|
||||||
]
|
|
||||||
partitions.append(
|
|
||||||
_PartitionCandidate(
|
|
||||||
name=name,
|
|
||||||
kind="pre_sharded",
|
|
||||||
distinct_count=cs.distinct_count,
|
|
||||||
fill_pct=cs.fill_pct,
|
|
||||||
top_values=_format_top_values(cs.top_values()),
|
|
||||||
observed_values_in_file=observed,
|
|
||||||
note="; ".join(note_parts),
|
|
||||||
# Pre-sharded STATE always wins the ranking.
|
|
||||||
score=1_000_000.0,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
claimed_by_partition.add(name)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Name-matched but not pre-sharded: fall through into the regular
|
if is_pre_sharded:
|
||||||
# partition candidate pass below, which will score it up due to the
|
note = (
|
||||||
# name match.
|
f"pre-sharded: this file only contains {cs.distinct_count} "
|
||||||
|
f"distinct value(s) ({observed}); keep the column and set "
|
||||||
|
"partition_by at the load_folder level so sibling files merge "
|
||||||
|
"into separate partitions of one table"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
note = (
|
||||||
|
f"observed {cs.distinct_display} distinct value(s) across "
|
||||||
|
f"{cs.fill_pct:.1f}% of rows; LIST partitioning will create "
|
||||||
|
"one child table per distinct value"
|
||||||
|
)
|
||||||
|
|
||||||
|
partitions.append(
|
||||||
|
_PartitionCandidate(
|
||||||
|
name=name,
|
||||||
|
kind=kind,
|
||||||
|
distinct_count=cs.distinct_count,
|
||||||
|
fill_pct=cs.fill_pct,
|
||||||
|
top_values=_format_top_values(cs.top_values()),
|
||||||
|
observed_values_in_file=observed,
|
||||||
|
note=note,
|
||||||
|
# Pre-sharded beats observed as the snippet's top pick.
|
||||||
|
score=(1_000_000.0 if is_pre_sharded else 500_000.0) + cs.fill_pct,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
claimed_by_partition.add(name)
|
||||||
|
|
||||||
|
partitions.sort(key=lambda p: p.score, reverse=True)
|
||||||
|
|
||||||
# -- Drop candidates ---------------------------------------------------
|
# -- Drop candidates ---------------------------------------------------
|
||||||
for name, cs in stats.items():
|
for name, cs in stats.items():
|
||||||
@ -541,62 +536,6 @@ def classify(
|
|||||||
drops.append(_DropCandidate(name=name, reason=reason))
|
drops.append(_DropCandidate(name=name, reason=reason))
|
||||||
|
|
||||||
dropped_names = {d.name for d in drops}
|
dropped_names = {d.name for d in drops}
|
||||||
|
|
||||||
# -- Partition candidates (observed) ----------------------------------
|
|
||||||
for name, cs in stats.items():
|
|
||||||
if name in claimed_by_partition or name in dropped_names:
|
|
||||||
continue
|
|
||||||
spec = columns.get(name)
|
|
||||||
if spec is None:
|
|
||||||
continue
|
|
||||||
pg_type = spec.postgres_type.upper()
|
|
||||||
if pg_type not in _PARTITION_FRIENDLY_TYPES:
|
|
||||||
continue
|
|
||||||
if cs.distinct_overflow:
|
|
||||||
continue
|
|
||||||
if not (
|
|
||||||
partition_min_distinct <= cs.distinct_count <= partition_max_distinct
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
if cs.fill_pct < partition_min_fill_pct:
|
|
||||||
continue
|
|
||||||
|
|
||||||
name_match = _matches_any(PARTITION_NAME_PATTERNS, name)
|
|
||||||
# Score: name-match dominates, then prefer fewer partitions (safer
|
|
||||||
# DDL), then prefer more-filled columns as a tiebreaker.
|
|
||||||
score = (
|
|
||||||
(500_000.0 if name_match else 0.0)
|
|
||||||
+ (partition_max_distinct - cs.distinct_count)
|
|
||||||
+ cs.fill_pct
|
|
||||||
)
|
|
||||||
|
|
||||||
notes: List[str] = []
|
|
||||||
if name_match:
|
|
||||||
notes.append("name matches PARTITION_NAME_PATTERNS")
|
|
||||||
if cs.distinct_count > 10_000:
|
|
||||||
notes.append(
|
|
||||||
f"distinct_count={cs.distinct_count:,} exceeds loader "
|
|
||||||
"max_partitions default (10,000); expect DDL warnings"
|
|
||||||
)
|
|
||||||
notes.append(
|
|
||||||
"LIST partitioning creates one child table per distinct value "
|
|
||||||
"(see load_sas.render_partition_ddl)"
|
|
||||||
)
|
|
||||||
|
|
||||||
partitions.append(
|
|
||||||
_PartitionCandidate(
|
|
||||||
name=name,
|
|
||||||
kind="observed",
|
|
||||||
distinct_count=cs.distinct_count,
|
|
||||||
fill_pct=cs.fill_pct,
|
|
||||||
top_values=_format_top_values(cs.top_values()),
|
|
||||||
observed_values_in_file=_format_top_values(cs.top_values()),
|
|
||||||
note="; ".join(notes),
|
|
||||||
score=score,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
partitions.sort(key=lambda p: p.score, reverse=True)
|
|
||||||
partition_names = {p.name for p in partitions}
|
partition_names = {p.name for p in partitions}
|
||||||
|
|
||||||
# -- Index candidates --------------------------------------------------
|
# -- Index candidates --------------------------------------------------
|
||||||
@ -768,17 +707,11 @@ def render_yaml_snippet(
|
|||||||
)
|
)
|
||||||
lines.append("partition_by:")
|
lines.append("partition_by:")
|
||||||
lines.append(f" - {top.name}")
|
lines.append(f" - {top.name}")
|
||||||
if len(partitions) > 1:
|
|
||||||
lines.append(
|
|
||||||
"# Runners-up (append to partition_by for multi-level "
|
|
||||||
"LIST partitioning; see load_sas.render_partition_ddl):"
|
|
||||||
)
|
|
||||||
for p in partitions[1:]:
|
|
||||||
lines.append(
|
|
||||||
f"# - {p.name} # kind={p.kind} distinct={p.distinct_count}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
lines.append("# (no partition candidates found)")
|
lines.append(
|
||||||
|
"# (no partition candidates found - no column matched "
|
||||||
|
"PARTITION_NAME_PATTERNS)"
|
||||||
|
)
|
||||||
|
|
||||||
lines.append("")
|
lines.append("")
|
||||||
|
|
||||||
@ -1050,8 +983,6 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
help="Null percentage at/above which a column is a drop candidate.")
|
help="Null percentage at/above which a column is a drop candidate.")
|
||||||
p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT,
|
p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT,
|
||||||
help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.")
|
help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.")
|
||||||
p.add_argument("--partition-min-distinct", type=int, default=PARTITION_MIN_DISTINCT)
|
|
||||||
p.add_argument("--partition-max-distinct", type=int, default=PARTITION_MAX_DISTINCT)
|
|
||||||
p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT)
|
p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT)
|
||||||
p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT)
|
p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT)
|
||||||
return p
|
return p
|
||||||
@ -1074,8 +1005,6 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
stats, columns,
|
stats, columns,
|
||||||
high_null_pct=args.high_null_pct,
|
high_null_pct=args.high_null_pct,
|
||||||
index_uniqueness_pct=args.index_uniqueness_pct,
|
index_uniqueness_pct=args.index_uniqueness_pct,
|
||||||
partition_min_distinct=args.partition_min_distinct,
|
|
||||||
partition_max_distinct=args.partition_max_distinct,
|
|
||||||
partition_min_fill_pct=args.partition_min_fill_pct,
|
partition_min_fill_pct=args.partition_min_fill_pct,
|
||||||
pre_sharded_max_distinct=args.pre_sharded_max_distinct,
|
pre_sharded_max_distinct=args.pre_sharded_max_distinct,
|
||||||
)
|
)
|
||||||
@ -1085,10 +1014,9 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
thresholds = {
|
thresholds = {
|
||||||
"HIGH_NULL_PCT": args.high_null_pct,
|
"HIGH_NULL_PCT": args.high_null_pct,
|
||||||
"INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct,
|
"INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct,
|
||||||
"PARTITION_MIN_DISTINCT": args.partition_min_distinct,
|
|
||||||
"PARTITION_MAX_DISTINCT": args.partition_max_distinct,
|
|
||||||
"PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct,
|
"PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct,
|
||||||
"PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct,
|
"PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct,
|
||||||
|
"PARTITION_NAME_PATTERNS": ", ".join(p.pattern for p in PARTITION_NAME_PATTERNS),
|
||||||
"DISTINCT_CAP": DISTINCT_CAP,
|
"DISTINCT_CAP": DISTINCT_CAP,
|
||||||
"TOP_N_VALUES": TOP_N_VALUES,
|
"TOP_N_VALUES": TOP_N_VALUES,
|
||||||
"PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,
|
"PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user