advanced_analyzer #8

Merged
dp merged 23 commits from advanced_analyzer into main 2026-04-21 22:32:18 +00:00
6 changed files with 2756 additions and 129 deletions

File diff suppressed because it is too large Load Diff

View File

@ -183,15 +183,17 @@ Priority order used by :func:`infer_schema`:
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
``DOUBLE PRECISION``. ``DOUBLE PRECISION``.
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for Type inference scans the whole file by default (``TYPE_INFERENCE_SAMPLE_ROWS
performance on large files. The CLI enforces this at read time via = None``) so type + nullability are both computed against every row. The CLI
:func:`read_sas_preview`, so the whole file is never materialized just to pick materializes the file once for schema inference, then re-streams it chunk by
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual chunk into ``COPY``; peak memory is roughly one full dataframe. Override
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32, ``TYPE_INFERENCE_SAMPLE_ROWS`` to an integer cap if you're on a host that
or a column had no nulls in the preview but does later in the file, ``COPY`` can't hold the file in memory - but know that sampled specs carry the usual
will fail mid-stream and the whole transaction rolls back. Set risks: a later row may exceed the inferred integer range, or a column that
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing had no nulls in the preview may carry nulls later in the file (which then
matters more than speed. detonates ``COPY`` because the sampled spec stamped it ``NOT NULL``). Seen
in production on a 2.5M-row file with ~6k null MAFIDs past the 10k-row
preview - the entire load aborted mid-stream.
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
commit each chunk as it is copied so an interrupted load retains the rows commit each chunk as it is copied so an interrupted load retains the rows
@ -225,16 +227,48 @@ import math
import os import os
import re import re
import sys import sys
import warnings
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd import pandas as pd
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyreadstat import pyreadstat
import yaml import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
from pandas.errors import PerformanceWarning
from tqdm import tqdm
# ``_prepare_for_copy`` builds its output frame one column at a time with
# ``out[name] = ...``. On wide SAS files (~100+ columns) pandas prints a
# ``PerformanceWarning: DataFrame is highly fragmented`` once per chunk to
# nudge callers toward ``pd.concat(axis=1, ...)``. The fragmentation only
# matters for row-oriented ops or in-place ``.copy()``; we hand the frame
# straight to ``pyarrow.Table.from_pandas`` which reads columns
# independently, so the warning is pure noise for our pipeline. Filter it
# at import time - narrow category match so nothing else is suppressed.
warnings.filterwarnings("ignore", category=PerformanceWarning)
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
# wrapped around several internal ops (most painfully, the multiply inside
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
# Our data routinely carries ``inf`` / huge sentinels, which trip that
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
# pre-masking the obvious cases, other code paths (pandas object-dtype
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
# Setting a process-wide ``seterr`` is a heavier hammer than an
# ``errstate`` block but survives library internals that don't explicitly
# rewrap it. Downside: a real overflow bug in new code would now silently
# produce inf/nan instead of raising - acceptable for a bulk loader where
# "don't crash on bad rows, null them and move on" is the whole point.
np.seterr(over="ignore", invalid="ignore", divide="ignore")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -255,17 +289,29 @@ values; too small a sample is easy to mis-infer."""
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647) NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
"""INTEGER bounds; anything outside becomes BIGINT.""" """INTEGER bounds; anything outside becomes BIGINT."""
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000 TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = None
"""Cap on rows inspected during per-column type inference. Also governs how """Cap on rows inspected during per-column type inference. Also governs how
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate / many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
schema-inference flows. Set to ``None`` to scan every row (and read the whole schema-inference flows.
file into memory for the preview step - don't do this on multi-hundred-million
row files)."""
DEFAULT_CHUNK_ROWS = 100_000 Default is ``None`` (scan every row, reading the whole file into memory for
the schema-inference step). That's the only honest setting for nullability:
any integer cap lets a column look ``NOT NULL`` across the first N rows
while the file actually holds rare nulls past the window, which then
detonates ``COPY`` mid-stream (seen in production on a 2.5M-row file where
~6k MAFIDs were null past the 10k-row preview). If you're loading a file
so large that a full read won't fit in memory, set this to an integer cap
and accept that sampled specs can't be trusted for ``NOT NULL``."""
DEFAULT_CHUNK_ROWS = 2_000_000
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean """Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
fewer COPY round-trips but more peak memory per chunk; smaller values are fewer COPY round-trips and lower per-row overhead but more peak memory per
gentler on memory.""" chunk; smaller values are gentler on memory.
The chunk size can be overridden at runtime via the
``GENERIC_LOADER_CHUNK_ROWS`` environment variable (read inside
:func:`iter_sas_chunks`), so ``.env``-driven overrides work without code
changes. Explicit ``chunksize=`` kwargs still win over both."""
VALID_IF_EXISTS = ("fail", "replace", "append") VALID_IF_EXISTS = ("fail", "replace", "append")
@ -290,6 +336,8 @@ class LoaderConfig:
partition_by: List[str] = field(default_factory=list) partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000 max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list) indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
@dataclass @dataclass
@ -500,6 +548,48 @@ def load_config(path: Path) -> LoaderConfig:
f"{missing_in_include}" f"{missing_in_include}"
) )
# -- column_types -------------------------------------------------------
# Optional ``{column_name: pg_type}`` escape hatch that bypasses
# automatic type inference for specific columns. Useful when
# pyreadstat reports a column as NUM but the downstream consumer
# expects TEXT (e.g. phone-id columns), or when a column has drifted
# between CHAR and NUM across file versions and you want to pin
# TEXT up front. See also :func:`infer_schema`.
raw_ct = raw.get("column_types")
column_types: Dict[str, str] = {}
if raw_ct is not None:
if not isinstance(raw_ct, dict):
raise ValueError(
f"Config {path}: 'column_types' must be a mapping of "
f"{{column_name: postgres_type}}."
)
for k, v in raw_ct.items():
key = str(k).strip()
if not key:
raise ValueError(
f"Config {path}: 'column_types' contains an empty "
f"column name."
)
if not isinstance(v, str) or not v.strip():
raise ValueError(
f"Config {path}: 'column_types[{key}]' must be a "
f"non-empty Postgres type string (got {v!r})."
)
column_types[key] = v.strip()
# -- all_nullable -------------------------------------------------------
# When inference wrongly stamps a column NOT NULL (sampled rows happened
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
# if set.
raw_an = raw.get("all_nullable", False)
if not isinstance(raw_an, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
)
all_nullable = bool(raw_an)
return LoaderConfig( return LoaderConfig(
filename=filename, filename=filename,
schemaname=schemaname, schemaname=schemaname,
@ -510,6 +600,8 @@ def load_config(path: Path) -> LoaderConfig:
partition_by=partition_by, partition_by=partition_by,
max_partitions=max_partitions, max_partitions=max_partitions,
indexes=indexes, indexes=indexes,
column_types=column_types,
all_nullable=all_nullable,
) )
@ -563,16 +655,46 @@ def read_sas_preview(
return reader(str(Path(path)), row_limit=row_limit, **kwargs) return reader(str(Path(path)), row_limit=row_limit, **kwargs)
def read_sas_metadata(path: Path) -> Any:
"""Read only the metadata (no rows) from a SAS file.
Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes
the file header (column names, formats, total row count, etc.) and
returns without touching the data pages. Orders of magnitude faster
than :func:`read_sas_preview` when all you need is
``meta.number_rows`` - typically a few ms per sas7bdat file, which
makes it cheap to pre-scan a whole folder to populate a global
progress bar.
"""
reader, kwargs = _sas_reader(path)
_, meta = reader(str(Path(path)), metadataonly=True, **kwargs)
return meta
def iter_sas_chunks( def iter_sas_chunks(
path: Path, path: Path,
*, *,
chunksize: int = DEFAULT_CHUNK_ROWS, chunksize: Optional[int] = None,
): ):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads. """Yield ``(df_chunk, meta)`` tuples for streaming loads.
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
underlying reader by extension and threads through our encoding defaults. underlying reader by extension and threads through our encoding defaults.
When ``chunksize`` is ``None`` (the default), the effective value comes
from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
always wins.
""" """
if chunksize is None:
raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS")
if raw is not None:
try:
chunksize = int(raw)
except ValueError:
chunksize = DEFAULT_CHUNK_ROWS
else:
chunksize = DEFAULT_CHUNK_ROWS
reader, kwargs = _sas_reader(path) reader, kwargs = _sas_reader(path)
yield from pyreadstat.read_file_in_chunks( yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs reader, str(Path(path)), chunksize=chunksize, **kwargs
@ -590,7 +712,13 @@ def apply_column_filter(
exclude: Optional[List[str]], exclude: Optional[List[str]],
) -> pd.DataFrame: ) -> pd.DataFrame:
"""Restrict ``df`` to the requested columns. Names missing from the frame """Restrict ``df`` to the requested columns. Names missing from the frame
raise a clear error rather than silently dropping.""" raise a clear error rather than silently dropping.
Returns the input frame (or a column-sliced view / drop result) without
an extra ``.copy()`` downstream (:func:`_prepare_for_copy`) reads the
frame into a freshly built output and never mutates its input, so the
copies were pure overhead on every streamed chunk.
"""
if include is not None and exclude is not None: if include is not None and exclude is not None:
raise ValueError("include and exclude are mutually exclusive.") raise ValueError("include and exclude are mutually exclusive.")
@ -598,15 +726,15 @@ def apply_column_filter(
missing = [c for c in include if c not in df.columns] missing = [c for c in include if c not in df.columns]
if missing: if missing:
raise ValueError(f"include references unknown columns: {missing}") raise ValueError(f"include references unknown columns: {missing}")
return df.loc[:, list(include)].copy() return df.loc[:, list(include)]
if exclude is not None: if exclude is not None:
missing = [c for c in exclude if c not in df.columns] missing = [c for c in exclude if c not in df.columns]
if missing: if missing:
raise ValueError(f"exclude references unknown columns: {missing}") raise ValueError(f"exclude references unknown columns: {missing}")
return df.drop(columns=list(exclude)).copy() return df.drop(columns=list(exclude))
return df.copy() return df
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -634,6 +762,126 @@ def _format_driven_type(sas_format: Optional[str]) -> Optional[str]:
return None return None
_DECIMAL_FORMAT_RE = re.compile(r"\.(\d+)")
def _format_hints_decimal(sas_format: Optional[str]) -> bool:
"""True if a numeric SAS format string explicitly carries decimal places.
SAS numeric formats are ``NAMEw.d``; ``d > 0`` means the variable was
intended to render with ``d`` decimal digits (COMMA10.2, F8.3, ...).
A bare width like ``BEST12.`` or ``F8.`` has no digits after the dot
and is treated as integer-presenting. Used by
:func:`union_column_types` to pick BIGINT vs DOUBLE PRECISION when a
column is numeric in every file of a cluster.
"""
if not sas_format:
return False
m = _DECIMAL_FORMAT_RE.search(sas_format)
if not m:
return False
try:
return int(m.group(1)) > 0
except ValueError:
return False
def extract_union_metadata(
meta: Any,
) -> Dict[str, Tuple[str, Optional[str]]]:
"""Pull the (readstat_type, sas_format) pair for every column in ``meta``.
Returns a plain dict that's safe to pass between processes and to
:func:`union_column_types`. ``readstat_type`` is the simplified type
reported by pyreadstat: ``"string"`` for SAS CHAR, ``"double"`` for
SAS NUM. ``sas_format`` comes from ``meta.original_variable_types``
and drives date/datetime detection during union.
"""
var_types = dict(getattr(meta, "variable_types", None) or {})
formats = dict(getattr(meta, "original_variable_types", None) or {})
names = list(
getattr(meta, "column_names", None)
or list(var_types.keys())
or list(formats.keys())
)
out: Dict[str, Tuple[str, Optional[str]]] = {}
for col in names:
rtype = str(var_types.get(col, "")) if var_types else ""
fmt = formats.get(col)
out[col] = (rtype, fmt if fmt else None)
return out
def union_column_types(
per_file_metas: Iterable[Dict[str, Tuple[str, Optional[str]]]],
) -> Dict[str, str]:
"""Derive one Postgres type per column that's safe across every file.
``per_file_metas`` is an iterable (one entry per file in a cluster) of
``{column_name: (readstat_type, sas_format)}`` dicts as produced by
:func:`extract_union_metadata`.
Rules, evaluated per column:
* **CHAR/NUM drift wins TEXT.** If any file stores the column as CHAR
(``readstat_type != "double"``) the union is ``TEXT``. This covers
the phone-id case where some years stored ``RESP_PH_PREFIX_ID`` as
CHAR and others as NUM.
* **All NUM, format hints DATETIME TIMESTAMP.** Any file whose
format resolves to ``TIMESTAMP`` (via :func:`_format_driven_type`)
pins the column to ``TIMESTAMP`` even if other files left the
format blank.
* **All NUM, format hints DATE DATE.** Same idea for date-only
formats.
* **All NUM, any decimal hint DOUBLE PRECISION.** A ``w.d`` format
with ``d > 0`` in any file implies fractional values somewhere.
* **All NUM, no useful hint DOUBLE PRECISION.** SAS numeric
formats are *display* formats, not storage constraints - a
``BEST12.`` / ``F8.`` / blank-format column can still hold floats,
and pyreadstat hands back plain ``float64`` regardless. Defaulting
to ``DOUBLE PRECISION`` here costs the same 8 bytes as ``BIGINT``
but can't fail on real data. For columns that truly are
integer-only and you want ``BIGINT`` semantics in queries, pin
them via a ``column_types`` override.
Columns missing from a given file are simply skipped for that file;
the union is computed over whichever files *did* supply the column.
Columns that never appear anywhere are omitted from the result.
"""
per_col: Dict[str, List[Tuple[str, Optional[str]]]] = {}
for meta in per_file_metas:
for col, pair in meta.items():
per_col.setdefault(col, []).append(pair)
result: Dict[str, str] = {}
for col, entries in per_col.items():
any_char = any(
rtype and rtype.lower() != "double" for rtype, _ in entries
)
if any_char:
result[col] = "TEXT"
continue
formats = [fmt for _, fmt in entries if fmt]
driven = [_format_driven_type(f) for f in formats]
if "TIMESTAMP" in driven:
result[col] = "TIMESTAMP"
elif "DATE" in driven:
result[col] = "DATE"
else:
# Safe default: DOUBLE PRECISION. The BIGINT default we tried
# first failed the moment a file contained a fractional
# value in a column whose format didn't carry a decimal
# hint (very common: SAS ``BEST12.`` / ``F8.`` are display
# formats, not storage constraints, so the underlying
# 8-byte float can hold any value). Same storage cost as
# BIGINT, handles both integer- and float-valued data, and
# keeps loads from failing mid-cluster. Use a
# ``column_types`` override to pin specific columns to
# ``BIGINT`` when you want integer semantics in queries.
result[col] = "DOUBLE PRECISION"
return result
def _all_null(series: pd.Series) -> bool: def _all_null(series: pd.Series) -> bool:
if pd.api.types.is_object_dtype(series): if pd.api.types.is_object_dtype(series):
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all()) return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
@ -759,6 +1007,8 @@ def infer_schema(
*, *,
coerce_chars: bool = COERCE_CHAR_COLUMNS, coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None, total_rows: Optional[int] = None,
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Dict[str, ColumnSpec]: ) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``. """Infer a Postgres column spec for each column in ``df``.
@ -774,11 +1024,30 @@ def infer_schema(
``total_rows`` lets callers who already sampled the frame (e.g. via ``total_rows`` lets callers who already sampled the frame (e.g. via
:func:`read_sas_preview`) report the real file size in the per-column :func:`read_sas_preview`) report the real file size in the per-column
"inferred from first N of M rows" note. Falls back to ``len(df)``. "inferred from first N of M rows" note. Falls back to ``len(df)``.
``column_types`` is an optional map ``{column_name: pg_type_str}``
whose entries bypass inference entirely - the caller has already
decided the type (e.g. via :func:`union_column_types` across a
cluster, or a YAML ``column_types`` override). Nullability is still
computed from the data. Columns in ``column_types`` that don't exist
in ``df`` are ignored so a shared override dict can apply to clusters
with different column sets.
``force_nullable=True`` stamps every column nullable regardless of
what the data sample shows. Escape hatch for when inference marks a
column ``NOT NULL`` because the sampled rows happened to be dense but
downstream files carry nulls in that column - common with cluster
loads where one file's preview can't speak for the rest. Cheaper than
trying to sharpen the sampler: widen the column and move on.
""" """
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
# Row-walking type probes run on a bounded head slice; nullability and the # When ``TYPE_INFERENCE_SAMPLE_ROWS`` is an integer cap, row-walking type
# all-null check still see every row so NOT NULL declarations stay honest. # probes run on the head slice for speed; nullability and the all-null
# check still walk every row of ``df``. That's only honest when the
# caller handed us the full file - with the default cap of ``None`` the
# CLI does exactly that. Callers who pass a partial preview and a tight
# integer cap accept that ``NOT NULL`` can be wrong for rare-null columns.
df_rows = len(df) df_rows = len(df)
effective_total = total_rows if total_rows is not None else df_rows effective_total = total_rows if total_rows is not None else df_rows
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS: if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
@ -789,6 +1058,8 @@ def infer_schema(
sample_size = df_rows sample_size = df_rows
sampled = sample_size < effective_total sampled = sample_size < effective_total
overrides: Dict[str, str] = dict(column_types or {})
# Temporarily flip the module-level flag if the caller asked us to. # Temporarily flip the module-level flag if the caller asked us to.
global COERCE_CHAR_COLUMNS global COERCE_CHAR_COLUMNS
saved = COERCE_CHAR_COLUMNS saved = COERCE_CHAR_COLUMNS
@ -801,6 +1072,27 @@ def infer_schema(
sas_format = original_formats.get(col) sas_format = original_formats.get(col)
notes: List[str] = [] notes: List[str] = []
if col in overrides:
pg_type = overrides[col]
notes.append(
f"type forced to {pg_type} via column_types override"
)
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
out[col] = ColumnSpec(
name=col,
postgres_type=pg_type,
nullable=nullable,
sas_format=sas_format,
source_dtype=str(series.dtype),
notes=notes,
sampled=sampled,
)
continue
pg_type = _format_driven_type(sas_format) pg_type = _format_driven_type(sas_format)
if pg_type is None: if pg_type is None:
@ -831,7 +1123,11 @@ def infer_schema(
f"{effective_total:,} rows" f"{effective_total:,} rows"
) )
nullable = _is_nullable(series) if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
out[col] = ColumnSpec( out[col] = ColumnSpec(
name=col, name=col,
@ -964,6 +1260,30 @@ def _normalize_type(pg_type: str) -> str:
return _TYPE_NORMALIZATION.get(stripped, stripped.lower()) return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
# Widening pairs: (inferred_from_source, existing_in_target). When the
# incoming spec is narrower than the target we accept it - the value is
# guaranteed to fit, and ``_prepare_for_copy`` already emits ``COPY``
# payloads that Postgres silently promotes to the wider column type. The
# INVERSE direction stays a hard failure: a BIGINT value does not fit in
# an INTEGER column, so we must not let a cluster whose first file had
# only small ints accept a later file with a value past int32. Comes up
# most often on cluster loads where file 1 pushed the target to BIGINT
# (a single value > 2_147_483_647) and file N happens to sit entirely
# within int32 range - strict equality would reject file N even though
# the copy is trivially safe.
_WIDENING_COMPATIBLE: set = {
("smallint", "integer"),
("smallint", "bigint"),
("integer", "bigint"),
("real", "double precision"),
# INTEGER / BIGINT into DOUBLE PRECISION is lossless for int32 and
# exact up to 2**53 for int64, which covers every value pandas could
# have carried through as Int64 without wrapping anyway.
("integer", "double precision"),
("bigint", "double precision"),
}
def _assert_schema_compatible( def _assert_schema_compatible(
conn, schema: str, table: str, columns: Dict[str, ColumnSpec] conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
) -> None: ) -> None:
@ -990,11 +1310,22 @@ def _assert_schema_compatible(
inferred_norm = _normalize_type(spec.postgres_type) inferred_norm = _normalize_type(spec.postgres_type)
target_norm = _normalize_type(target_type) target_norm = _normalize_type(target_type)
if inferred_norm != target_norm: if inferred_norm != target_norm:
mismatches.append( if (inferred_norm, target_norm) in _WIDENING_COMPATIBLE:
f"column {name!r}: inferred {spec.postgres_type} " # Narrower inferred type fits inside the wider target.
f"(normalized {inferred_norm!r}) but target is {target_type} " # Accept silently-but-noisily so the operator knows the
f"(normalized {target_norm!r})" # file came in with a smaller range than the cluster's
) # target was sized for.
warnings.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(narrower than target {target_type}); accepting - "
f"values fit in the wider target type"
)
else:
mismatches.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(normalized {inferred_norm!r}) but target is {target_type} "
f"(normalized {target_norm!r})"
)
target_is_notnull = (target_nullable == "NO") target_is_notnull = (target_nullable == "NO")
if spec.nullable and target_is_notnull: if spec.nullable and target_is_notnull:
warnings.append( warnings.append(
@ -1661,9 +1992,151 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
return dt.time(h, m, s) return dt.time(h, m, s)
# Safe outer bound for the numeric->datetime conversion below. The true
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
# and (b) staying well inside the float64 + datetime64[ns] windows gives
# pandas' internals zero room to trip the ``over="raise"`` they wrap
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
_SAS_DATETIME_SAFE_S = 7_500_000_000
_SAS_DATETIME_SAFE_D = 87_000
def _safe_numeric_to_datetime(
series: pd.Series,
*,
unit: str,
column_name: str,
target_type: str,
) -> pd.Series:
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
one stray cell take down the worker.
Failure modes seen in production:
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
sentinels, divide-by-zero in the source, uninitialized cells).
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
overflows float64.
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
seconds) where the nanosecond multiply silently produces garbage or
overflows int64.
All of these trigger ``FloatingPointError: overflow encountered in multiply``
inside ``pd.to_datetime`` because pandas wraps the multiply in
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
gets a chance to turn the bad value into ``NaT``.
Strategy, belt + suspenders + airbag:
1. Coerce to float64 up front. Object-dtype branches hand us mixed
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
and NaNs the rest, so we hit the rest of this function with a
pristine float series.
2. Mask non-finite values and anything outside the safe epoch window to
NaN *before* ``pd.to_datetime`` sees them.
3. Run the conversion under a permissive ``errstate``.
4. If that still raises (some pandas version internally re-enables
``over="raise"`` in a way ``errstate`` can't override), catch it
and return all-NaT for the column with a loud warning. Better a
NULL column in one chunk than a dead worker + no diagnostics.
Emits one stderr line per chunk per affected column so silent data
loss doesn't sneak by.
"""
if not pd.api.types.is_float_dtype(series):
series = pd.to_numeric(series, errors="coerce").astype("float64")
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
if unit == "s":
bound = _SAS_DATETIME_SAFE_S
elif unit == "D":
bound = _SAS_DATETIME_SAFE_D
else:
bound = _SAS_DATETIME_SAFE_S
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
finite_mask = np.isfinite(arr)
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
# to ``bound``, so ``in_range_mask`` already excludes non-finite
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
# in case a future numpy changes that semantic.
in_range_mask = np.abs(arr) < bound
keep_mask = finite_mask & in_range_mask
was_present = ~np.isnan(arr)
coerced = int(((~keep_mask) & was_present).sum())
if coerced:
tqdm.write(
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
f"coerced to NULL",
file=sys.stderr,
)
cleaned_arr = np.where(keep_mask, arr, np.nan)
cleaned = pd.Series(cleaned_arr, index=series.index)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
)
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk. This usually means one "
f"or more values slipped past the pre-mask (bound={bound}). "
f"Consider setting the column to TEXT via column_types if this "
f"recurs.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _safe_object_to_datetime(
series: pd.Series,
*,
column_name: str,
target_type: str,
) -> pd.Series:
"""Object-dtype to datetime. Shares the safety net (errstate +
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
actually numeric-flavored (e.g. SAS wrote numbers into an object
column), route to the numeric path; otherwise parse with ``to_datetime``
on the object itself.
"""
coerced = series.replace({"": None})
numeric = pd.to_numeric(coerced, errors="coerce")
all_numeric = numeric.notna().sum() == coerced.notna().sum()
if all_numeric and coerced.notna().any():
return _safe_numeric_to_datetime(
numeric, unit="s", column_name=column_name, target_type=target_type,
)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(coerced, errors="coerce")
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame: def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
"""Materialize a copy of ``df`` with each column in the right shape for """Materialize a copy of ``df`` with each column in the right shape for
``to_csv`` so the CSV lands as valid input for the target Postgres type. ``to_csv`` so the CSV lands as valid input for the target Postgres type.
Per-column conversions are vectorized (``.astype`` / ``pd.to_datetime`` /
``.mask`` / ``.fillna``) instead of the element-wise ``.map(func)``
loops this function used to run. That was the single largest per-chunk
CPU cost on text-heavy loads - a 40-column × 100k-row chunk was issuing
~4M Python-level function calls just to cast strings. TIME columns are
still the ``.map`` path because SAS TIME8 is stored as seconds and the
clamp-to-24h logic doesn't fit cleanly in vector form; they're also
rare in practice.
""" """
out = pd.DataFrame(index=df.index) out = pd.DataFrame(index=df.index)
for name, spec in columns.items(): for name, spec in columns.items():
@ -1686,59 +2159,86 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
if pd.api.types.is_datetime64_any_dtype(series): if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series.dt.date out[name] = series.dt.date
elif pd.api.types.is_object_dtype(series): elif pd.api.types.is_object_dtype(series):
def _to_date(v: Any) -> Optional[dt.date]: # Vectorized parse: empty strings / None / unparseable -> NaT,
if v is None or (isinstance(v, float) and pd.isna(v)): # then .dt.date yields date objects or NaT. NaT serializes as
return None # an empty CSV field (matching ``NULL ''`` in COPY). Routed
if isinstance(v, dt.datetime): # through ``_safe_object_to_datetime`` so an object column
return v.date() # that actually contains SAS-epoch numerics (seen when one
if isinstance(v, dt.date): # file of a cluster stores the column as NUM and another as
return v # CHAR + the union flipped it to TEXT-then-DATE) can't trip
if isinstance(v, str): # the overflow-in-multiply bug.
if v == "": parsed = _safe_object_to_datetime(
return None series, column_name=name, target_type="DATE",
try: )
return dt.date.fromisoformat(v) out[name] = parsed.dt.date
except ValueError: elif pd.api.types.is_numeric_dtype(series):
return None # pyreadstat couldn't decode the SAS format (some
return None # ``DATEw.``/``YYMMDDw.`` variants and all custom formats slip
out[name] = series.map(_to_date) # through) so the column came back as float64: days since
# 1960-01-01, the SAS epoch. Without this branch the raw
# number would hit COPY and Postgres rejects it with
# ``invalid input syntax for type date``.
parsed = _safe_numeric_to_datetime(
series, unit="D", column_name=name, target_type="DATE",
)
out[name] = parsed.dt.date
else: else:
out[name] = series out[name] = series
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"): elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
if pd.api.types.is_datetime64_any_dtype(series): if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series out[name] = series
elif pd.api.types.is_object_dtype(series): elif pd.api.types.is_object_dtype(series):
def _to_dt(v: Any) -> Optional[dt.datetime]: # Same rationale as the DATE object branch above: route
if v is None or (isinstance(v, float) and pd.isna(v)): # through the safety net so numeric-flavored object columns
return None # can't blow us up during the ns multiply.
if isinstance(v, dt.datetime): out[name] = _safe_object_to_datetime(
return v series, column_name=name, target_type="TIMESTAMP",
if isinstance(v, dt.date): )
return dt.datetime(v.year, v.month, v.day) elif pd.api.types.is_numeric_dtype(series):
if isinstance(v, pd.Timestamp): # Same story as the DATE branch above, but SAS datetimes are
return v.to_pydatetime() if not pd.isna(v) else None # *seconds* since 1960-01-01 (fractional seconds for
if isinstance(v, str): # ``DATETIMEw.d``). Example caught in the wild:
if v == "": # ``1915465463.615`` -> 2020-09-13 05:44:23.615.
return None out[name] = _safe_numeric_to_datetime(
try: series, unit="s", column_name=name, target_type="TIMESTAMP",
return dt.datetime.fromisoformat(v) )
except ValueError:
return None
return None
out[name] = series.map(_to_dt)
else: else:
out[name] = series out[name] = series
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"): elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
out[name] = series.map(_seconds_to_time) out[name] = series.map(_seconds_to_time)
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"): elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
# Leave empty strings as "" so `NULL ''` in COPY turns them into NULL. # Render every cell as a string and blank out nulls. ``NULL ''``
def _to_str(v: Any) -> Any: # in the COPY statement turns the blanks back into SQL NULL.
if v is None: # astype(str) stringifies NaN/None to the literal "nan"/"None",
return "" # so we mask those after the fact rather than branching per cell.
if isinstance(v, float) and pd.isna(v): na_mask = series.isna()
return "" if pd.api.types.is_numeric_dtype(series):
return str(v) # Hit when a column was auto-unioned to TEXT because at
out[name] = series.map(_to_str) # least one file of the cluster stored it as CHAR but this
# particular file stored it as NUM (typical of SAS phone-id
# columns). Default float formatting would emit "123.0" -
# which doesn't match the plain "123" coming from the CHAR
# files. When the whole chunk is integer-valued, round to
# int before stringifying; when any fractional value is
# present we leave float formatting alone so we don't
# silently drop precision.
nonnull = series.dropna()
int_like = False
if not nonnull.empty:
try:
int_like = bool(((nonnull % 1) == 0).all())
except TypeError:
int_like = False
if int_like:
# ``Int64`` preserves NA; ``.astype(str)`` renders NA
# as '<NA>', which we then mask out alongside original
# NaNs.
as_str = series.astype("Int64").astype(str)
out[name] = as_str.mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
elif pg == "BOOLEAN": elif pg == "BOOLEAN":
out[name] = series.astype("boolean") if series.dtype != object else series out[name] = series.astype("boolean") if series.dtype != object else series
else: else:
@ -1746,6 +2246,25 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
return out return out
def _serialize_chunk_csv(prepared: pd.DataFrame) -> io.BytesIO:
"""Serialize a prepared frame into a CSV buffer for ``COPY FROM STDIN``.
Uses ``pyarrow.csv.write_csv`` (typically 5-10× faster than pandas'
pure-Python ``to_csv`` on wide/text-heavy frames). Null cells serialize
as empty strings and date/timestamp values land in ISO 8601 form, both
of which Postgres accepts under ``FORMAT csv, NULL ''``.
"""
table = pa.Table.from_pandas(prepared, preserve_index=False)
buf = io.BytesIO()
pa_csv.write_csv(
table,
buf,
write_options=pa_csv.WriteOptions(include_header=False),
)
buf.seek(0)
return buf
def copy_dataframes( def copy_dataframes(
conn, conn,
schema_name: str, schema_name: str,
@ -1768,23 +2287,43 @@ def copy_dataframes(
) )
total = 0 total = 0
# Pull chunks one at a time so each ``df`` is unreferenced before the
# generator reads the next one. Without this the loop-variable binding
# of a ``for df in dfs:`` keeps the previous chunk alive during the
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
# once COPY has consumed it. Matters most in parallel mode where
# 32 × per-worker peak can exhaust a 128 GB host.
dfs_iter = iter(dfs)
with conn.cursor() as cur: with conn.cursor() as cur:
for df in dfs: while True:
try:
df = next(dfs_iter)
except StopIteration:
break
if df.empty: if df.empty:
del df
continue continue
prepared = _prepare_for_copy(df, columns) prepared = _prepare_for_copy(df, columns)
buf = io.StringIO() del df
prepared.to_csv( n = len(prepared)
buf, buf = _serialize_chunk_csv(prepared)
index=False, del prepared
header=False,
na_rep="",
date_format="%Y-%m-%d %H:%M:%S",
)
buf.seek(0)
cur.copy_expert(sql, buf) cur.copy_expert(sql, buf)
del buf
conn.commit() conn.commit()
total += len(prepared) total += n
# Hand pyarrow's pool memory back between chunks. Without this,
# arrow's internal buffer pool keeps the high-water bytes
# reserved across the worker's lifetime - inside long-running
# workers this presents as steadily climbing RSS even with the
# ``del``s above. Cheap (microseconds); call it every chunk.
try:
pa.default_memory_pool().release_unused()
except Exception:
pass
return total return total
@ -1899,6 +2438,16 @@ def _build_argparser() -> argparse.ArgumentParser:
"PGUSER / PGPASSWORD from the environment or .env file." "PGUSER / PGPASSWORD from the environment or .env file."
), ),
) )
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference. Use when sampled rows wrongly suggest a "
"column has no nulls. Overrides ``all_nullable`` in the YAML "
"config when set."
),
)
return p return p
@ -1921,13 +2470,23 @@ def main(argv: Optional[List[str]] = None) -> int:
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr) print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
return 2 return 2
# Schema inference uses a bounded preview read so we never load a # Schema inference reads the whole file so type + nullability are
# hundreds-of-millions-of-rows file into memory just to pick types. # computed against every row. That's what the target host has the
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows # resources for and is the only way to honestly emit ``NOT NULL`` -
# returned, not the file's total, so we don't trust it here. # a bounded preview routinely missed the ~0.2% of rows with nulls on
# otherwise-dense keys (e.g. MAFID). If you're on a box that can't
# fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to
# an integer cap and know that sampled specs may stamp ``NOT NULL``
# on columns whose nulls live past the window.
preview_df, meta = read_sas_preview(cfg.filename) preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
columns = infer_schema(preview_df, meta) force_nullable = args.all_nullable or cfg.all_nullable
columns = infer_schema(
preview_df,
meta,
column_types=cfg.column_types,
force_nullable=force_nullable,
)
# Validate partition columns exist in the schema after filtering. # Validate partition columns exist in the schema after filtering.
if cfg.partition_by: if cfg.partition_by:
@ -2018,13 +2577,24 @@ def main(argv: Optional[List[str]] = None) -> int:
# it while we're holding a Postgres transaction open. # it while we're holding a Postgres transaction open.
del preview_df del preview_df
total_rows = getattr(meta, "number_rows", None)
def _filtered_chunks(): def _filtered_chunks():
seen = 0 pbar = tqdm(
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename): total=total_rows,
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude) unit="row",
seen += len(chunk_df) unit_scale=True,
print(f" streaming... {seen:,} rows", file=sys.stderr) desc=f" {cfg.filename.name}",
yield chunk_df file=sys.stderr,
dynamic_ncols=True,
)
try:
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
pbar.update(len(chunk_df))
yield chunk_df
finally:
pbar.close()
db_user = db_password = None db_user = db_password = None
if args.dbcreds: if args.dbcreds:

View File

@ -38,3 +38,24 @@ if_exists: append
# indexes: # indexes:
# - state # - state
# - zip # - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -61,15 +61,52 @@ auto_detect: true
# - state # - state
# - zip # - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file # Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect # *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly. # pool, so explicit and auto clusters compose cleanly.
# #
# `tablename` is required. `if_exists`, `include`, and `exclude` are # `tablename` is required. `if_exists`, `include`, `exclude`, and
# optional per-cluster overrides of the folder-level defaults above. # `column_types` are optional per-cluster overrides of the folder-level
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
clusters: clusters:
- pattern: '^group_a\d+\.xpt$' - pattern: '^group_a\d+\.xpt$'
tablename: group_a tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to # Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace": # append instead of replace even though the folder default is "replace":

View File

@ -1,7 +1,10 @@
pandas>=2.0,<3.0 pandas>=2.0,<3.0
pyreadstat>=1.2,<2.0 pyreadstat>=1.2,<2.0
numpy>=2.1,<3.0 numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0 pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0 psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0 python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0 boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

1138
utils/sas_profiler.py Normal file

File diff suppressed because it is too large Load Diff