diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index cfb5aa9..e9e769e 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -220,6 +220,7 @@ class ClusterSpec: max_partitions: int = 10_000 indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) + all_nullable: bool = False @dataclass @@ -244,6 +245,7 @@ class _ExplicitPattern: max_partitions: Optional[int] = None indexes: Optional[List[str]] = None column_types: Optional[Dict[str, str]] = None + all_nullable: Optional[bool] = None @dataclass @@ -268,6 +270,7 @@ class FolderConfig: max_partitions: int = 10_000 indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) + all_nullable: bool = False # --------------------------------------------------------------------------- @@ -509,6 +512,17 @@ def load_folder_config(path: Path) -> FolderConfig: raw.get("column_types"), f"Config {path}" ) + # -- folder-level all_nullable ----------------------------------------- + # Sets the default for every cluster. Per-cluster ``all_nullable`` wins + # when present; the CLI ``--all-nullable`` flag trumps both. + raw_an_folder = raw.get("all_nullable", False) + if not isinstance(raw_an_folder, bool): + raise ValueError( + f"Config {path}: 'all_nullable' must be a boolean " + f"(got {raw_an_folder!r})." + ) + all_nullable_default = bool(raw_an_folder) + explicit: List[_ExplicitPattern] = [] clusters_raw = raw.get("clusters") or [] if not isinstance(clusters_raw, list): @@ -555,6 +569,19 @@ def load_folder_config(path: Path) -> FolderConfig: entry.get("column_types"), where, allow_none=True ) + # -- per-cluster all_nullable --------------------------------------- + c_all_nullable: Optional[bool] + if "all_nullable" in entry: + raw_c_an = entry["all_nullable"] + if not isinstance(raw_c_an, bool): + raise ValueError( + f"{where}: 'all_nullable' must be a boolean " + f"(got {raw_c_an!r})." + ) + c_all_nullable = bool(raw_c_an) + else: + c_all_nullable = None + explicit.append( _ExplicitPattern( pattern=compiled, @@ -567,6 +594,7 @@ def load_folder_config(path: Path) -> FolderConfig: max_partitions=c_max_partitions, indexes=c_indexes, column_types=c_column_types, + all_nullable=c_all_nullable, ) ) @@ -582,6 +610,7 @@ def load_folder_config(path: Path) -> FolderConfig: max_partitions=max_partitions, indexes=indexes, column_types=column_types or {}, + all_nullable=all_nullable_default, ) @@ -688,6 +717,12 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: else: resolved_ct = {**cfg.column_types, **patt.column_types} + # Resolve all_nullable: None = inherit folder, bool = override. + resolved_an = ( + patt.all_nullable if patt.all_nullable is not None + else cfg.all_nullable + ) + matched = [f for f in remaining if patt.pattern.search(f.name)] if not matched: # Not an error - the folder might legitimately not contain files @@ -705,6 +740,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: max_partitions=resolved_mp, indexes=resolved_idx, column_types=dict(resolved_ct), + all_nullable=resolved_an, ) ) continue @@ -722,6 +758,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: max_partitions=resolved_mp, indexes=resolved_idx, column_types=dict(resolved_ct), + all_nullable=resolved_an, ) ) @@ -743,6 +780,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: max_partitions=cfg.max_partitions, indexes=cfg.indexes, column_types=dict(cfg.column_types), + all_nullable=cfg.all_nullable, ) ) @@ -760,6 +798,7 @@ def _infer_cluster_schema( exclude, *, column_types: Optional[Dict[str, str]] = None, + force_nullable: bool = False, ) -> Tuple[Dict, Optional[int]]: """Infer the Postgres column schema from a SAS file preview. @@ -769,6 +808,8 @@ def _infer_cluster_schema( denominator instead of an indeterminate spinner. ``column_types`` lets the caller pin specific columns to a chosen Postgres type (typically the merged auto-union + YAML overrides for the cluster). + ``force_nullable`` stamps every column nullable regardless of what + the preview shows - see :func:`load_sas.infer_schema`. """ preview_df, meta = read_sas_preview(path) preview_df = apply_column_filter(preview_df, include, exclude) @@ -777,6 +818,7 @@ def _infer_cluster_schema( preview_df, meta, total_rows=total_rows, column_types=column_types, + force_nullable=force_nullable, ) return columns, total_rows @@ -850,6 +892,7 @@ def load_cluster( first_columns, first_total_rows = _infer_cluster_schema( first, cluster.include, cluster.exclude, column_types=cluster.column_types, + force_nullable=cluster.all_nullable, ) # -- Validate index columns early --------------------------------------- @@ -929,6 +972,7 @@ def load_cluster( progress_queue=progress_queue, db_overrides=db_overrides, column_types=cluster.column_types, + force_nullable=cluster.all_nullable, abort_on_first_failure=abort_on_first_failure, ) else: @@ -947,6 +991,7 @@ def load_cluster( columns, path_total_rows = _infer_cluster_schema( path, cluster.include, cluster.exclude, column_types=cluster.column_types, + force_nullable=cluster.all_nullable, ) # Uses the same check that if_exists=append runs. A type # mismatch or missing column aborts the cluster; because @@ -1031,6 +1076,7 @@ def _worker_load_append_file( progress_queue: Any, db_overrides: Optional[Dict[str, Optional[str]]], column_types: Optional[Dict[str, str]] = None, + force_nullable: bool = False, ) -> Tuple[str, int, Optional[str]]: """Worker process: load one SAS file in append mode. @@ -1074,6 +1120,7 @@ def _worker_load_append_file( preview_df, meta, total_rows=total_rows, column_types=column_types, + force_nullable=force_nullable, ) # Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB # and we never need it again after schema inference. @@ -1141,6 +1188,7 @@ def _load_remaining_files_parallel( progress_queue: Any, db_overrides: Optional[Dict[str, Optional[str]]], column_types: Optional[Dict[str, str]] = None, + force_nullable: bool = False, abort_on_first_failure: bool = False, ) -> int: """Run append-mode loads for ``files`` across a process pool. @@ -1196,6 +1244,7 @@ def _load_remaining_files_parallel( progress_queue, db_overrides, column_types, + force_nullable, ) for p in files ] @@ -1361,6 +1410,17 @@ def _build_argparser() -> argparse.ArgumentParser: "elapsed time - it just can't estimate remaining time." ), ) + p.add_argument( + "--all-nullable", + action="store_true", + help=( + "Stamp every column nullable in the generated schema, bypassing " + "NOT NULL inference for every cluster. Use when sampled rows " + "wrongly suggest a column has no nulls and COPY fails mid-load " + "on the first null it hits. Overrides the per-cluster and " + "folder-level ``all_nullable`` YAML settings when set." + ), + ) p.add_argument( "--workers", type=int, @@ -1409,6 +1469,20 @@ def main(argv: Optional[List[str]] = None) -> int: return 2 clusters = discover_clusters(cfg) + + # CLI override: --all-nullable trumps both folder-level and per-cluster + # YAML ``all_nullable`` settings. Applied here (before any schema work) + # so every downstream path - dry-run, pre-scan, worker dispatch - sees + # the same flag on the ClusterSpec. + if args.all_nullable: + for c in clusters: + c.all_nullable = True + print( + "[info] --all-nullable set: stamping every column nullable " + "across all clusters (NOT NULL inference disabled).", + file=sys.stderr, + ) + loadable = [c for c in clusters if c.files] if not loadable: @@ -1434,6 +1508,7 @@ def main(argv: Optional[List[str]] = None) -> int: columns, _ = _infer_cluster_schema( c.files[0], c.include, c.exclude, column_types=c.column_types, + force_nullable=c.all_nullable, ) # Print parent CREATE TABLE (with PARTITION BY if applicable). print( diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index cc867c4..68db570 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -321,6 +321,7 @@ class LoaderConfig: max_partitions: int = 10_000 indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) + all_nullable: bool = False @dataclass @@ -560,6 +561,19 @@ def load_config(path: Path) -> LoaderConfig: ) column_types[key] = v.strip() + # -- all_nullable ------------------------------------------------------- + # When inference wrongly stamps a column NOT NULL (sampled rows happened + # to be dense; later rows carry nulls) downstream COPYs fail mid-stream. + # Set ``all_nullable: true`` in the YAML to stamp every column nullable + # up front. The CLI flag ``--all-nullable`` overrides this to ``true`` + # if set. + raw_an = raw.get("all_nullable", False) + if not isinstance(raw_an, bool): + raise ValueError( + f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})." + ) + all_nullable = bool(raw_an) + return LoaderConfig( filename=filename, schemaname=schemaname, @@ -571,6 +585,7 @@ def load_config(path: Path) -> LoaderConfig: max_partitions=max_partitions, indexes=indexes, column_types=column_types, + all_nullable=all_nullable, ) @@ -977,6 +992,7 @@ def infer_schema( coerce_chars: bool = COERCE_CHAR_COLUMNS, total_rows: Optional[int] = None, column_types: Optional[Dict[str, str]] = None, + force_nullable: bool = False, ) -> Dict[str, ColumnSpec]: """Infer a Postgres column spec for each column in ``df``. @@ -1000,6 +1016,13 @@ def infer_schema( computed from the data. Columns in ``column_types`` that don't exist in ``df`` are ignored so a shared override dict can apply to clusters with different column sets. + + ``force_nullable=True`` stamps every column nullable regardless of + what the data sample shows. Escape hatch for when inference marks a + column ``NOT NULL`` because the sampled rows happened to be dense but + downstream files carry nulls in that column - common with cluster + loads where one file's preview can't speak for the rest. Cheaper than + trying to sharpen the sampler: widen the column and move on. """ original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) @@ -1038,7 +1061,11 @@ def infer_schema( notes.append( f"type forced to {pg_type} via column_types override" ) - nullable = _is_nullable(series) + if force_nullable: + nullable = True + notes.append("nullable forced via --all-nullable") + else: + nullable = _is_nullable(series) out[col] = ColumnSpec( name=col, postgres_type=pg_type, @@ -1080,7 +1107,11 @@ def infer_schema( f"{effective_total:,} rows" ) - nullable = _is_nullable(series) + if force_nullable: + nullable = True + notes.append("nullable forced via --all-nullable") + else: + nullable = _is_nullable(series) out[col] = ColumnSpec( name=col, @@ -2315,6 +2346,16 @@ def _build_argparser() -> argparse.ArgumentParser: "PGUSER / PGPASSWORD from the environment or .env file." ), ) + p.add_argument( + "--all-nullable", + action="store_true", + help=( + "Stamp every column nullable in the generated schema, bypassing " + "NOT NULL inference. Use when sampled rows wrongly suggest a " + "column has no nulls. Overrides ``all_nullable`` in the YAML " + "config when set." + ), + ) return p @@ -2347,7 +2388,13 @@ def main(argv: Optional[List[str]] = None) -> int: # on columns whose nulls live past the window. preview_df, meta = read_sas_preview(cfg.filename) preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) - columns = infer_schema(preview_df, meta, column_types=cfg.column_types) + force_nullable = args.all_nullable or cfg.all_nullable + columns = infer_schema( + preview_df, + meta, + column_types=cfg.column_types, + force_nullable=force_nullable, + ) # Validate partition columns exist in the schema after filtering. if cfg.partition_by: diff --git a/generic_loader/sample_config.yaml b/generic_loader/sample_config.yaml index 9ebfe45..223cc1a 100644 --- a/generic_loader/sample_config.yaml +++ b/generic_loader/sample_config.yaml @@ -50,3 +50,12 @@ if_exists: append # column_types: # RESP_PH_PREFIX_ID: TEXT # SOMELONG_ID: BIGINT + +# all_nullable: If true, every column is stamped nullable in the generated +# schema; NOT NULL inference is skipped entirely. Use this when the sampler +# wrongly concludes a column has no nulls (e.g. a dense sample followed by +# rare-null data downstream) and COPY blows up mid-load on the first null +# it hits. Off by default. The CLI flag --all-nullable overrides this to +# true when set. +# +# all_nullable: false diff --git a/generic_loader/sample_folder_config.yaml b/generic_loader/sample_folder_config.yaml index b1961b9..4cb394c 100644 --- a/generic_loader/sample_folder_config.yaml +++ b/generic_loader/sample_folder_config.yaml @@ -82,6 +82,17 @@ auto_detect: true # RESP_PH_SUFFIX_ID: TEXT # SOMELONG_ID: BIGINT +# Folder-level all_nullable: If true, every column of every cluster is +# stamped nullable in the generated schema; NOT NULL inference is skipped +# entirely. Use this when the sampler wrongly concludes a column has no +# nulls (sampled rows happened to be dense, but later files in the cluster +# carry nulls) and COPY blows up mid-load. Inherited by all clusters +# unless a cluster supplies its own all_nullable. The CLI flag +# --all-nullable overrides both this and any per-cluster setting when +# passed. Off by default. +# +# all_nullable: false + # Explicit cluster patterns. Each pattern is matched against the file # *basename*. Files matched by a pattern are pulled out of the auto-detect # pool, so explicit and auto clusters compose cleanly. @@ -95,6 +106,7 @@ clusters: tablename: group_a # column_types: # INTCOL: TEXT + # all_nullable: true # per-cluster override of the folder-level default # Example of an explicit override. Uncomment to force the group_b cluster to # append instead of replace even though the folder default is "replace":