advanced_analyzer #8

Merged
dp merged 23 commits from advanced_analyzer into main 2026-04-21 22:32:18 +00:00
4 changed files with 146 additions and 3 deletions
Showing only changes of commit eff82c73ce - Show all commits

View File

@ -220,6 +220,7 @@ class ClusterSpec:
max_partitions: int = 10_000 max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list) indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict) column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
@dataclass @dataclass
@ -244,6 +245,7 @@ class _ExplicitPattern:
max_partitions: Optional[int] = None max_partitions: Optional[int] = None
indexes: Optional[List[str]] = None indexes: Optional[List[str]] = None
column_types: Optional[Dict[str, str]] = None column_types: Optional[Dict[str, str]] = None
all_nullable: Optional[bool] = None
@dataclass @dataclass
@ -268,6 +270,7 @@ class FolderConfig:
max_partitions: int = 10_000 max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list) indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict) column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -509,6 +512,17 @@ def load_folder_config(path: Path) -> FolderConfig:
raw.get("column_types"), f"Config {path}" raw.get("column_types"), f"Config {path}"
) )
# -- folder-level all_nullable -----------------------------------------
# Sets the default for every cluster. Per-cluster ``all_nullable`` wins
# when present; the CLI ``--all-nullable`` flag trumps both.
raw_an_folder = raw.get("all_nullable", False)
if not isinstance(raw_an_folder, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean "
f"(got {raw_an_folder!r})."
)
all_nullable_default = bool(raw_an_folder)
explicit: List[_ExplicitPattern] = [] explicit: List[_ExplicitPattern] = []
clusters_raw = raw.get("clusters") or [] clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list): if not isinstance(clusters_raw, list):
@ -555,6 +569,19 @@ def load_folder_config(path: Path) -> FolderConfig:
entry.get("column_types"), where, allow_none=True entry.get("column_types"), where, allow_none=True
) )
# -- per-cluster all_nullable ---------------------------------------
c_all_nullable: Optional[bool]
if "all_nullable" in entry:
raw_c_an = entry["all_nullable"]
if not isinstance(raw_c_an, bool):
raise ValueError(
f"{where}: 'all_nullable' must be a boolean "
f"(got {raw_c_an!r})."
)
c_all_nullable = bool(raw_c_an)
else:
c_all_nullable = None
explicit.append( explicit.append(
_ExplicitPattern( _ExplicitPattern(
pattern=compiled, pattern=compiled,
@ -567,6 +594,7 @@ def load_folder_config(path: Path) -> FolderConfig:
max_partitions=c_max_partitions, max_partitions=c_max_partitions,
indexes=c_indexes, indexes=c_indexes,
column_types=c_column_types, column_types=c_column_types,
all_nullable=c_all_nullable,
) )
) )
@ -582,6 +610,7 @@ def load_folder_config(path: Path) -> FolderConfig:
max_partitions=max_partitions, max_partitions=max_partitions,
indexes=indexes, indexes=indexes,
column_types=column_types or {}, column_types=column_types or {},
all_nullable=all_nullable_default,
) )
@ -688,6 +717,12 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
else: else:
resolved_ct = {**cfg.column_types, **patt.column_types} resolved_ct = {**cfg.column_types, **patt.column_types}
# Resolve all_nullable: None = inherit folder, bool = override.
resolved_an = (
patt.all_nullable if patt.all_nullable is not None
else cfg.all_nullable
)
matched = [f for f in remaining if patt.pattern.search(f.name)] matched = [f for f in remaining if patt.pattern.search(f.name)]
if not matched: if not matched:
# Not an error - the folder might legitimately not contain files # Not an error - the folder might legitimately not contain files
@ -705,6 +740,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
max_partitions=resolved_mp, max_partitions=resolved_mp,
indexes=resolved_idx, indexes=resolved_idx,
column_types=dict(resolved_ct), column_types=dict(resolved_ct),
all_nullable=resolved_an,
) )
) )
continue continue
@ -722,6 +758,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
max_partitions=resolved_mp, max_partitions=resolved_mp,
indexes=resolved_idx, indexes=resolved_idx,
column_types=dict(resolved_ct), column_types=dict(resolved_ct),
all_nullable=resolved_an,
) )
) )
@ -743,6 +780,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
max_partitions=cfg.max_partitions, max_partitions=cfg.max_partitions,
indexes=cfg.indexes, indexes=cfg.indexes,
column_types=dict(cfg.column_types), column_types=dict(cfg.column_types),
all_nullable=cfg.all_nullable,
) )
) )
@ -760,6 +798,7 @@ def _infer_cluster_schema(
exclude, exclude,
*, *,
column_types: Optional[Dict[str, str]] = None, column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Tuple[Dict, Optional[int]]: ) -> Tuple[Dict, Optional[int]]:
"""Infer the Postgres column schema from a SAS file preview. """Infer the Postgres column schema from a SAS file preview.
@ -769,6 +808,8 @@ def _infer_cluster_schema(
denominator instead of an indeterminate spinner. ``column_types`` denominator instead of an indeterminate spinner. ``column_types``
lets the caller pin specific columns to a chosen Postgres type lets the caller pin specific columns to a chosen Postgres type
(typically the merged auto-union + YAML overrides for the cluster). (typically the merged auto-union + YAML overrides for the cluster).
``force_nullable`` stamps every column nullable regardless of what
the preview shows - see :func:`load_sas.infer_schema`.
""" """
preview_df, meta = read_sas_preview(path) preview_df, meta = read_sas_preview(path)
preview_df = apply_column_filter(preview_df, include, exclude) preview_df = apply_column_filter(preview_df, include, exclude)
@ -777,6 +818,7 @@ def _infer_cluster_schema(
preview_df, meta, preview_df, meta,
total_rows=total_rows, total_rows=total_rows,
column_types=column_types, column_types=column_types,
force_nullable=force_nullable,
) )
return columns, total_rows return columns, total_rows
@ -850,6 +892,7 @@ def load_cluster(
first_columns, first_total_rows = _infer_cluster_schema( first_columns, first_total_rows = _infer_cluster_schema(
first, cluster.include, cluster.exclude, first, cluster.include, cluster.exclude,
column_types=cluster.column_types, column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
) )
# -- Validate index columns early --------------------------------------- # -- Validate index columns early ---------------------------------------
@ -929,6 +972,7 @@ def load_cluster(
progress_queue=progress_queue, progress_queue=progress_queue,
db_overrides=db_overrides, db_overrides=db_overrides,
column_types=cluster.column_types, column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
abort_on_first_failure=abort_on_first_failure, abort_on_first_failure=abort_on_first_failure,
) )
else: else:
@ -947,6 +991,7 @@ def load_cluster(
columns, path_total_rows = _infer_cluster_schema( columns, path_total_rows = _infer_cluster_schema(
path, cluster.include, cluster.exclude, path, cluster.include, cluster.exclude,
column_types=cluster.column_types, column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
) )
# Uses the same check that if_exists=append runs. A type # Uses the same check that if_exists=append runs. A type
# mismatch or missing column aborts the cluster; because # mismatch or missing column aborts the cluster; because
@ -1031,6 +1076,7 @@ def _worker_load_append_file(
progress_queue: Any, progress_queue: Any,
db_overrides: Optional[Dict[str, Optional[str]]], db_overrides: Optional[Dict[str, Optional[str]]],
column_types: Optional[Dict[str, str]] = None, column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Tuple[str, int, Optional[str]]: ) -> Tuple[str, int, Optional[str]]:
"""Worker process: load one SAS file in append mode. """Worker process: load one SAS file in append mode.
@ -1074,6 +1120,7 @@ def _worker_load_append_file(
preview_df, meta, preview_df, meta,
total_rows=total_rows, total_rows=total_rows,
column_types=column_types, column_types=column_types,
force_nullable=force_nullable,
) )
# Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB # Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB
# and we never need it again after schema inference. # and we never need it again after schema inference.
@ -1141,6 +1188,7 @@ def _load_remaining_files_parallel(
progress_queue: Any, progress_queue: Any,
db_overrides: Optional[Dict[str, Optional[str]]], db_overrides: Optional[Dict[str, Optional[str]]],
column_types: Optional[Dict[str, str]] = None, column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
abort_on_first_failure: bool = False, abort_on_first_failure: bool = False,
) -> int: ) -> int:
"""Run append-mode loads for ``files`` across a process pool. """Run append-mode loads for ``files`` across a process pool.
@ -1196,6 +1244,7 @@ def _load_remaining_files_parallel(
progress_queue, progress_queue,
db_overrides, db_overrides,
column_types, column_types,
force_nullable,
) )
for p in files for p in files
] ]
@ -1361,6 +1410,17 @@ def _build_argparser() -> argparse.ArgumentParser:
"elapsed time - it just can't estimate remaining time." "elapsed time - it just can't estimate remaining time."
), ),
) )
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference for every cluster. Use when sampled rows "
"wrongly suggest a column has no nulls and COPY fails mid-load "
"on the first null it hits. Overrides the per-cluster and "
"folder-level ``all_nullable`` YAML settings when set."
),
)
p.add_argument( p.add_argument(
"--workers", "--workers",
type=int, type=int,
@ -1409,6 +1469,20 @@ def main(argv: Optional[List[str]] = None) -> int:
return 2 return 2
clusters = discover_clusters(cfg) clusters = discover_clusters(cfg)
# CLI override: --all-nullable trumps both folder-level and per-cluster
# YAML ``all_nullable`` settings. Applied here (before any schema work)
# so every downstream path - dry-run, pre-scan, worker dispatch - sees
# the same flag on the ClusterSpec.
if args.all_nullable:
for c in clusters:
c.all_nullable = True
print(
"[info] --all-nullable set: stamping every column nullable "
"across all clusters (NOT NULL inference disabled).",
file=sys.stderr,
)
loadable = [c for c in clusters if c.files] loadable = [c for c in clusters if c.files]
if not loadable: if not loadable:
@ -1434,6 +1508,7 @@ def main(argv: Optional[List[str]] = None) -> int:
columns, _ = _infer_cluster_schema( columns, _ = _infer_cluster_schema(
c.files[0], c.include, c.exclude, c.files[0], c.include, c.exclude,
column_types=c.column_types, column_types=c.column_types,
force_nullable=c.all_nullable,
) )
# Print parent CREATE TABLE (with PARTITION BY if applicable). # Print parent CREATE TABLE (with PARTITION BY if applicable).
print( print(

View File

@ -321,6 +321,7 @@ class LoaderConfig:
max_partitions: int = 10_000 max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list) indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict) column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
@dataclass @dataclass
@ -560,6 +561,19 @@ def load_config(path: Path) -> LoaderConfig:
) )
column_types[key] = v.strip() column_types[key] = v.strip()
# -- all_nullable -------------------------------------------------------
# When inference wrongly stamps a column NOT NULL (sampled rows happened
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
# if set.
raw_an = raw.get("all_nullable", False)
if not isinstance(raw_an, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
)
all_nullable = bool(raw_an)
return LoaderConfig( return LoaderConfig(
filename=filename, filename=filename,
schemaname=schemaname, schemaname=schemaname,
@ -571,6 +585,7 @@ def load_config(path: Path) -> LoaderConfig:
max_partitions=max_partitions, max_partitions=max_partitions,
indexes=indexes, indexes=indexes,
column_types=column_types, column_types=column_types,
all_nullable=all_nullable,
) )
@ -977,6 +992,7 @@ def infer_schema(
coerce_chars: bool = COERCE_CHAR_COLUMNS, coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None, total_rows: Optional[int] = None,
column_types: Optional[Dict[str, str]] = None, column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Dict[str, ColumnSpec]: ) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``. """Infer a Postgres column spec for each column in ``df``.
@ -1000,6 +1016,13 @@ def infer_schema(
computed from the data. Columns in ``column_types`` that don't exist computed from the data. Columns in ``column_types`` that don't exist
in ``df`` are ignored so a shared override dict can apply to clusters in ``df`` are ignored so a shared override dict can apply to clusters
with different column sets. with different column sets.
``force_nullable=True`` stamps every column nullable regardless of
what the data sample shows. Escape hatch for when inference marks a
column ``NOT NULL`` because the sampled rows happened to be dense but
downstream files carry nulls in that column - common with cluster
loads where one file's preview can't speak for the rest. Cheaper than
trying to sharpen the sampler: widen the column and move on.
""" """
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
@ -1038,6 +1061,10 @@ def infer_schema(
notes.append( notes.append(
f"type forced to {pg_type} via column_types override" f"type forced to {pg_type} via column_types override"
) )
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series) nullable = _is_nullable(series)
out[col] = ColumnSpec( out[col] = ColumnSpec(
name=col, name=col,
@ -1080,6 +1107,10 @@ def infer_schema(
f"{effective_total:,} rows" f"{effective_total:,} rows"
) )
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series) nullable = _is_nullable(series)
out[col] = ColumnSpec( out[col] = ColumnSpec(
@ -2315,6 +2346,16 @@ def _build_argparser() -> argparse.ArgumentParser:
"PGUSER / PGPASSWORD from the environment or .env file." "PGUSER / PGPASSWORD from the environment or .env file."
), ),
) )
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference. Use when sampled rows wrongly suggest a "
"column has no nulls. Overrides ``all_nullable`` in the YAML "
"config when set."
),
)
return p return p
@ -2347,7 +2388,13 @@ def main(argv: Optional[List[str]] = None) -> int:
# on columns whose nulls live past the window. # on columns whose nulls live past the window.
preview_df, meta = read_sas_preview(cfg.filename) preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
columns = infer_schema(preview_df, meta, column_types=cfg.column_types) force_nullable = args.all_nullable or cfg.all_nullable
columns = infer_schema(
preview_df,
meta,
column_types=cfg.column_types,
force_nullable=force_nullable,
)
# Validate partition columns exist in the schema after filtering. # Validate partition columns exist in the schema after filtering.
if cfg.partition_by: if cfg.partition_by:

View File

@ -50,3 +50,12 @@ if_exists: append
# column_types: # column_types:
# RESP_PH_PREFIX_ID: TEXT # RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT # SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -82,6 +82,17 @@ auto_detect: true
# RESP_PH_SUFFIX_ID: TEXT # RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT # SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file # Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect # *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly. # pool, so explicit and auto clusters compose cleanly.
@ -95,6 +106,7 @@ clusters:
tablename: group_a tablename: group_a
# column_types: # column_types:
# INTCOL: TEXT # INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to # Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace": # append instead of replace even though the folder default is "replace":