From 5449a25b44c7623f0291dcf68e5b896d88a4aa77 Mon Sep 17 00:00:00 2001 From: David Peterson Date: Mon, 20 Apr 2026 18:49:23 -0500 Subject: [PATCH] Refactor partition candidate logic in sas_profiler.py Updated the partition candidate selection process to restrict candidates to columns matching specific name patterns, improving accuracy and reducing noise. Removed outdated distinct value constraints and clarified documentation for partitioning behavior. Enhanced handling of pre-sharded columns and refined the classification logic for better performance. --- utils/sas_profiler.py | 196 +++++++++++++----------------------------- 1 file changed, 62 insertions(+), 134 deletions(-) diff --git a/utils/sas_profiler.py b/utils/sas_profiler.py index 7cbab2c..88456c2 100644 --- a/utils/sas_profiler.py +++ b/utils/sas_profiler.py @@ -76,21 +76,14 @@ INDEX_UNIQUENESS_PCT: float = 95.0 """Columns whose distinct/non-null ratio meets or exceeds this threshold are flagged as index candidates.""" -PARTITION_MIN_DISTINCT: int = 2 -"""A partition candidate must have at least this many distinct values.""" - -PARTITION_MAX_DISTINCT: int = 500 -"""A partition candidate must have at most this many distinct values. Kept -deliberately tighter than the loader's max_partitions default (10,000) so -the default suggestions stay conservative.""" - PARTITION_MIN_FILL_PCT: float = 95.0 -"""Partition candidates must be non-null in at least this fraction of rows.""" +"""Name-matched partition candidates must be non-null in at least this +fraction of rows.""" PRE_SHARDED_MAX_DISTINCT: int = 3 """A name-matched column with <= this many distinct values is treated as -"the file is probably pre-sharded on this column" rather than being -silently dumped into the drop list.""" +pre-sharded ("this file is one slice; sibling files have the other values") +rather than as a ready-to-partition observed column.""" DISTINCT_CAP: int = 10_000 """Max size of the per-column distinct-value set. Exceeding this marks the @@ -108,10 +101,11 @@ PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( re.compile(r"^state$", re.IGNORECASE), re.compile(r"^state_?code$", re.IGNORECASE), ) -"""Column names that are "probably partition columns" regardless of how -many distinct values happen to be present in this one file. Kept tiny on -purpose - add more patterns here later if you want to recognise -region/year/etc.""" +"""Only columns whose name matches one of these patterns are ever considered +partition candidates. This deliberately ignores generic low-cardinality +signals (status flags, boolean columns, etc.) because in practice the only +useful partition key in this codebase is STATE. Add more patterns here if +that ever stops being true.""" INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( @@ -123,12 +117,6 @@ INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( """Name-bonus patterns for index-candidate ranking.""" -_PARTITION_FRIENDLY_TYPES: frozenset = frozenset( - {"TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER", - "INTEGER", "BIGINT", "SMALLINT", "BOOLEAN", "DATE"} -) - - # --------------------------------------------------------------------------- # Per-column streaming aggregator # --------------------------------------------------------------------------- @@ -447,8 +435,6 @@ def classify( *, high_null_pct: float, index_uniqueness_pct: float, - partition_min_distinct: int, - partition_max_distinct: int, partition_min_fill_pct: float, pre_sharded_max_distinct: int, ) -> Tuple[ @@ -457,60 +443,69 @@ def classify( List[_IndexCandidate], List[_TypeWarning], ]: - """Turn per-column stats + the loader's schema into four ranked lists.""" + """Turn per-column stats + the loader's schema into four ranked lists. + + Partition candidates are restricted to columns whose name matches + :data:`PARTITION_NAME_PATTERNS` - in practice STATE / STATE_CODE. A + generic "low-cardinality = partition candidate" heuristic produces too + much noise for this codebase, so we only surface columns we're confident + about by name. + """ drops: List[_DropCandidate] = [] partitions: List[_PartitionCandidate] = [] indexes: List[_IndexCandidate] = [] warnings: List[_TypeWarning] = [] - # Names we've already routed into the partition lane - exclude them from - # the drop / index lanes downstream. - claimed_by_partition: set = set() - - # -- First pass: partition-name-matched columns ------------------------ + # -- Partition candidates (name-matched only) -------------------------- # Run this before the drop check so pre-sharded STATE columns don't get - # silently dropped. + # silently dropped for being "constant". + claimed_by_partition: set = set() for name, cs in stats.items(): - spec = columns.get(name) if not _matches_any(PARTITION_NAME_PATTERNS, name): continue - if cs.n_total == 0: + if cs.n_total == 0 or cs.n_non_null == 0: + continue + if cs.fill_pct < partition_min_fill_pct: continue - looks_pre_sharded = ( - cs.n_non_null > 0 - and not cs.distinct_overflow + is_pre_sharded = ( + not cs.distinct_overflow and cs.distinct_count <= pre_sharded_max_distinct - and cs.fill_pct >= partition_min_fill_pct ) - if looks_pre_sharded: - observed = ", ".join(_format_value(v) for v, _ in cs.top_values(pre_sharded_max_distinct)) - note_parts = [ - f"pre-sharded: this file only contains {cs.distinct_count} distinct " - f"value(s) ({observed})", - "keep the column and set partition_by at the load_folder level so " - "sibling files merge into separate partitions of one table", - ] - partitions.append( - _PartitionCandidate( - name=name, - kind="pre_sharded", - distinct_count=cs.distinct_count, - fill_pct=cs.fill_pct, - top_values=_format_top_values(cs.top_values()), - observed_values_in_file=observed, - note="; ".join(note_parts), - # Pre-sharded STATE always wins the ranking. - score=1_000_000.0, - ) - ) - claimed_by_partition.add(name) - continue + kind = "pre_sharded" if is_pre_sharded else "observed" + observed = _format_top_values(cs.top_values(pre_sharded_max_distinct)) - # Name-matched but not pre-sharded: fall through into the regular - # partition candidate pass below, which will score it up due to the - # name match. + if is_pre_sharded: + note = ( + f"pre-sharded: this file only contains {cs.distinct_count} " + f"distinct value(s) ({observed}); keep the column and set " + "partition_by at the load_folder level so sibling files merge " + "into separate partitions of one table" + ) + else: + note = ( + f"observed {cs.distinct_display} distinct value(s) across " + f"{cs.fill_pct:.1f}% of rows; LIST partitioning will create " + "one child table per distinct value" + ) + + partitions.append( + _PartitionCandidate( + name=name, + kind=kind, + distinct_count=cs.distinct_count, + fill_pct=cs.fill_pct, + top_values=_format_top_values(cs.top_values()), + observed_values_in_file=observed, + note=note, + # Pre-sharded beats observed as the snippet's top pick. + score=(1_000_000.0 if is_pre_sharded else 500_000.0) + cs.fill_pct, + ) + ) + claimed_by_partition.add(name) + + partitions.sort(key=lambda p: p.score, reverse=True) # -- Drop candidates --------------------------------------------------- for name, cs in stats.items(): @@ -541,62 +536,6 @@ def classify( drops.append(_DropCandidate(name=name, reason=reason)) dropped_names = {d.name for d in drops} - - # -- Partition candidates (observed) ---------------------------------- - for name, cs in stats.items(): - if name in claimed_by_partition or name in dropped_names: - continue - spec = columns.get(name) - if spec is None: - continue - pg_type = spec.postgres_type.upper() - if pg_type not in _PARTITION_FRIENDLY_TYPES: - continue - if cs.distinct_overflow: - continue - if not ( - partition_min_distinct <= cs.distinct_count <= partition_max_distinct - ): - continue - if cs.fill_pct < partition_min_fill_pct: - continue - - name_match = _matches_any(PARTITION_NAME_PATTERNS, name) - # Score: name-match dominates, then prefer fewer partitions (safer - # DDL), then prefer more-filled columns as a tiebreaker. - score = ( - (500_000.0 if name_match else 0.0) - + (partition_max_distinct - cs.distinct_count) - + cs.fill_pct - ) - - notes: List[str] = [] - if name_match: - notes.append("name matches PARTITION_NAME_PATTERNS") - if cs.distinct_count > 10_000: - notes.append( - f"distinct_count={cs.distinct_count:,} exceeds loader " - "max_partitions default (10,000); expect DDL warnings" - ) - notes.append( - "LIST partitioning creates one child table per distinct value " - "(see load_sas.render_partition_ddl)" - ) - - partitions.append( - _PartitionCandidate( - name=name, - kind="observed", - distinct_count=cs.distinct_count, - fill_pct=cs.fill_pct, - top_values=_format_top_values(cs.top_values()), - observed_values_in_file=_format_top_values(cs.top_values()), - note="; ".join(notes), - score=score, - ) - ) - - partitions.sort(key=lambda p: p.score, reverse=True) partition_names = {p.name for p in partitions} # -- Index candidates -------------------------------------------------- @@ -768,17 +707,11 @@ def render_yaml_snippet( ) lines.append("partition_by:") lines.append(f" - {top.name}") - if len(partitions) > 1: - lines.append( - "# Runners-up (append to partition_by for multi-level " - "LIST partitioning; see load_sas.render_partition_ddl):" - ) - for p in partitions[1:]: - lines.append( - f"# - {p.name} # kind={p.kind} distinct={p.distinct_count}" - ) else: - lines.append("# (no partition candidates found)") + lines.append( + "# (no partition candidates found - no column matched " + "PARTITION_NAME_PATTERNS)" + ) lines.append("") @@ -1050,8 +983,6 @@ def _build_argparser() -> argparse.ArgumentParser: help="Null percentage at/above which a column is a drop candidate.") p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT, help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.") - p.add_argument("--partition-min-distinct", type=int, default=PARTITION_MIN_DISTINCT) - p.add_argument("--partition-max-distinct", type=int, default=PARTITION_MAX_DISTINCT) p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT) p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT) return p @@ -1074,8 +1005,6 @@ def main(argv: Optional[List[str]] = None) -> int: stats, columns, high_null_pct=args.high_null_pct, index_uniqueness_pct=args.index_uniqueness_pct, - partition_min_distinct=args.partition_min_distinct, - partition_max_distinct=args.partition_max_distinct, partition_min_fill_pct=args.partition_min_fill_pct, pre_sharded_max_distinct=args.pre_sharded_max_distinct, ) @@ -1085,10 +1014,9 @@ def main(argv: Optional[List[str]] = None) -> int: thresholds = { "HIGH_NULL_PCT": args.high_null_pct, "INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct, - "PARTITION_MIN_DISTINCT": args.partition_min_distinct, - "PARTITION_MAX_DISTINCT": args.partition_max_distinct, "PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct, "PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct, + "PARTITION_NAME_PATTERNS": ", ".join(p.pattern for p in PARTITION_NAME_PATTERNS), "DISTINCT_CAP": DISTINCT_CAP, "TOP_N_VALUES": TOP_N_VALUES, "PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,