diff --git a/utils/sas_profiler.py b/utils/sas_profiler.py index 7cbab2c..88456c2 100644 --- a/utils/sas_profiler.py +++ b/utils/sas_profiler.py @@ -76,21 +76,14 @@ INDEX_UNIQUENESS_PCT: float = 95.0 """Columns whose distinct/non-null ratio meets or exceeds this threshold are flagged as index candidates.""" -PARTITION_MIN_DISTINCT: int = 2 -"""A partition candidate must have at least this many distinct values.""" - -PARTITION_MAX_DISTINCT: int = 500 -"""A partition candidate must have at most this many distinct values. Kept -deliberately tighter than the loader's max_partitions default (10,000) so -the default suggestions stay conservative.""" - PARTITION_MIN_FILL_PCT: float = 95.0 -"""Partition candidates must be non-null in at least this fraction of rows.""" +"""Name-matched partition candidates must be non-null in at least this +fraction of rows.""" PRE_SHARDED_MAX_DISTINCT: int = 3 """A name-matched column with <= this many distinct values is treated as -"the file is probably pre-sharded on this column" rather than being -silently dumped into the drop list.""" +pre-sharded ("this file is one slice; sibling files have the other values") +rather than as a ready-to-partition observed column.""" DISTINCT_CAP: int = 10_000 """Max size of the per-column distinct-value set. Exceeding this marks the @@ -108,10 +101,11 @@ PARTITION_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( re.compile(r"^state$", re.IGNORECASE), re.compile(r"^state_?code$", re.IGNORECASE), ) -"""Column names that are "probably partition columns" regardless of how -many distinct values happen to be present in this one file. Kept tiny on -purpose - add more patterns here later if you want to recognise -region/year/etc.""" +"""Only columns whose name matches one of these patterns are ever considered +partition candidates. This deliberately ignores generic low-cardinality +signals (status flags, boolean columns, etc.) because in practice the only +useful partition key in this codebase is STATE. Add more patterns here if +that ever stops being true.""" INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( @@ -123,12 +117,6 @@ INDEX_NAME_PATTERNS: Tuple[re.Pattern, ...] = ( """Name-bonus patterns for index-candidate ranking.""" -_PARTITION_FRIENDLY_TYPES: frozenset = frozenset( - {"TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER", - "INTEGER", "BIGINT", "SMALLINT", "BOOLEAN", "DATE"} -) - - # --------------------------------------------------------------------------- # Per-column streaming aggregator # --------------------------------------------------------------------------- @@ -447,8 +435,6 @@ def classify( *, high_null_pct: float, index_uniqueness_pct: float, - partition_min_distinct: int, - partition_max_distinct: int, partition_min_fill_pct: float, pre_sharded_max_distinct: int, ) -> Tuple[ @@ -457,60 +443,69 @@ def classify( List[_IndexCandidate], List[_TypeWarning], ]: - """Turn per-column stats + the loader's schema into four ranked lists.""" + """Turn per-column stats + the loader's schema into four ranked lists. + + Partition candidates are restricted to columns whose name matches + :data:`PARTITION_NAME_PATTERNS` - in practice STATE / STATE_CODE. A + generic "low-cardinality = partition candidate" heuristic produces too + much noise for this codebase, so we only surface columns we're confident + about by name. + """ drops: List[_DropCandidate] = [] partitions: List[_PartitionCandidate] = [] indexes: List[_IndexCandidate] = [] warnings: List[_TypeWarning] = [] - # Names we've already routed into the partition lane - exclude them from - # the drop / index lanes downstream. - claimed_by_partition: set = set() - - # -- First pass: partition-name-matched columns ------------------------ + # -- Partition candidates (name-matched only) -------------------------- # Run this before the drop check so pre-sharded STATE columns don't get - # silently dropped. + # silently dropped for being "constant". + claimed_by_partition: set = set() for name, cs in stats.items(): - spec = columns.get(name) if not _matches_any(PARTITION_NAME_PATTERNS, name): continue - if cs.n_total == 0: + if cs.n_total == 0 or cs.n_non_null == 0: + continue + if cs.fill_pct < partition_min_fill_pct: continue - looks_pre_sharded = ( - cs.n_non_null > 0 - and not cs.distinct_overflow + is_pre_sharded = ( + not cs.distinct_overflow and cs.distinct_count <= pre_sharded_max_distinct - and cs.fill_pct >= partition_min_fill_pct ) - if looks_pre_sharded: - observed = ", ".join(_format_value(v) for v, _ in cs.top_values(pre_sharded_max_distinct)) - note_parts = [ - f"pre-sharded: this file only contains {cs.distinct_count} distinct " - f"value(s) ({observed})", - "keep the column and set partition_by at the load_folder level so " - "sibling files merge into separate partitions of one table", - ] - partitions.append( - _PartitionCandidate( - name=name, - kind="pre_sharded", - distinct_count=cs.distinct_count, - fill_pct=cs.fill_pct, - top_values=_format_top_values(cs.top_values()), - observed_values_in_file=observed, - note="; ".join(note_parts), - # Pre-sharded STATE always wins the ranking. - score=1_000_000.0, - ) - ) - claimed_by_partition.add(name) - continue + kind = "pre_sharded" if is_pre_sharded else "observed" + observed = _format_top_values(cs.top_values(pre_sharded_max_distinct)) - # Name-matched but not pre-sharded: fall through into the regular - # partition candidate pass below, which will score it up due to the - # name match. + if is_pre_sharded: + note = ( + f"pre-sharded: this file only contains {cs.distinct_count} " + f"distinct value(s) ({observed}); keep the column and set " + "partition_by at the load_folder level so sibling files merge " + "into separate partitions of one table" + ) + else: + note = ( + f"observed {cs.distinct_display} distinct value(s) across " + f"{cs.fill_pct:.1f}% of rows; LIST partitioning will create " + "one child table per distinct value" + ) + + partitions.append( + _PartitionCandidate( + name=name, + kind=kind, + distinct_count=cs.distinct_count, + fill_pct=cs.fill_pct, + top_values=_format_top_values(cs.top_values()), + observed_values_in_file=observed, + note=note, + # Pre-sharded beats observed as the snippet's top pick. + score=(1_000_000.0 if is_pre_sharded else 500_000.0) + cs.fill_pct, + ) + ) + claimed_by_partition.add(name) + + partitions.sort(key=lambda p: p.score, reverse=True) # -- Drop candidates --------------------------------------------------- for name, cs in stats.items(): @@ -541,62 +536,6 @@ def classify( drops.append(_DropCandidate(name=name, reason=reason)) dropped_names = {d.name for d in drops} - - # -- Partition candidates (observed) ---------------------------------- - for name, cs in stats.items(): - if name in claimed_by_partition or name in dropped_names: - continue - spec = columns.get(name) - if spec is None: - continue - pg_type = spec.postgres_type.upper() - if pg_type not in _PARTITION_FRIENDLY_TYPES: - continue - if cs.distinct_overflow: - continue - if not ( - partition_min_distinct <= cs.distinct_count <= partition_max_distinct - ): - continue - if cs.fill_pct < partition_min_fill_pct: - continue - - name_match = _matches_any(PARTITION_NAME_PATTERNS, name) - # Score: name-match dominates, then prefer fewer partitions (safer - # DDL), then prefer more-filled columns as a tiebreaker. - score = ( - (500_000.0 if name_match else 0.0) - + (partition_max_distinct - cs.distinct_count) - + cs.fill_pct - ) - - notes: List[str] = [] - if name_match: - notes.append("name matches PARTITION_NAME_PATTERNS") - if cs.distinct_count > 10_000: - notes.append( - f"distinct_count={cs.distinct_count:,} exceeds loader " - "max_partitions default (10,000); expect DDL warnings" - ) - notes.append( - "LIST partitioning creates one child table per distinct value " - "(see load_sas.render_partition_ddl)" - ) - - partitions.append( - _PartitionCandidate( - name=name, - kind="observed", - distinct_count=cs.distinct_count, - fill_pct=cs.fill_pct, - top_values=_format_top_values(cs.top_values()), - observed_values_in_file=_format_top_values(cs.top_values()), - note="; ".join(notes), - score=score, - ) - ) - - partitions.sort(key=lambda p: p.score, reverse=True) partition_names = {p.name for p in partitions} # -- Index candidates -------------------------------------------------- @@ -768,17 +707,11 @@ def render_yaml_snippet( ) lines.append("partition_by:") lines.append(f" - {top.name}") - if len(partitions) > 1: - lines.append( - "# Runners-up (append to partition_by for multi-level " - "LIST partitioning; see load_sas.render_partition_ddl):" - ) - for p in partitions[1:]: - lines.append( - f"# - {p.name} # kind={p.kind} distinct={p.distinct_count}" - ) else: - lines.append("# (no partition candidates found)") + lines.append( + "# (no partition candidates found - no column matched " + "PARTITION_NAME_PATTERNS)" + ) lines.append("") @@ -1050,8 +983,6 @@ def _build_argparser() -> argparse.ArgumentParser: help="Null percentage at/above which a column is a drop candidate.") p.add_argument("--index-uniqueness-pct", type=float, default=INDEX_UNIQUENESS_PCT, help="Uniqueness (distinct/non-null) at/above which a column is an index candidate.") - p.add_argument("--partition-min-distinct", type=int, default=PARTITION_MIN_DISTINCT) - p.add_argument("--partition-max-distinct", type=int, default=PARTITION_MAX_DISTINCT) p.add_argument("--partition-min-fill-pct", type=float, default=PARTITION_MIN_FILL_PCT) p.add_argument("--pre-sharded-max-distinct", type=int, default=PRE_SHARDED_MAX_DISTINCT) return p @@ -1074,8 +1005,6 @@ def main(argv: Optional[List[str]] = None) -> int: stats, columns, high_null_pct=args.high_null_pct, index_uniqueness_pct=args.index_uniqueness_pct, - partition_min_distinct=args.partition_min_distinct, - partition_max_distinct=args.partition_max_distinct, partition_min_fill_pct=args.partition_min_fill_pct, pre_sharded_max_distinct=args.pre_sharded_max_distinct, ) @@ -1085,10 +1014,9 @@ def main(argv: Optional[List[str]] = None) -> int: thresholds = { "HIGH_NULL_PCT": args.high_null_pct, "INDEX_UNIQUENESS_PCT": args.index_uniqueness_pct, - "PARTITION_MIN_DISTINCT": args.partition_min_distinct, - "PARTITION_MAX_DISTINCT": args.partition_max_distinct, "PARTITION_MIN_FILL_PCT": args.partition_min_fill_pct, "PRE_SHARDED_MAX_DISTINCT": args.pre_sharded_max_distinct, + "PARTITION_NAME_PATTERNS": ", ".join(p.pattern for p in PARTITION_NAME_PATTERNS), "DISTINCT_CAP": DISTINCT_CAP, "TOP_N_VALUES": TOP_N_VALUES, "PREVIEW_ROWS_FOR_INFERENCE": PREVIEW_ROWS_FOR_INFERENCE,