Compare commits

..

No commits in common. "64e7ff0b0a479bb7e7ccbdd568d5bc8aecbc4bd1" and "e48038f3c65471aff1074e0f37b7df8937e305b1" have entirely different histories.

6 changed files with 126 additions and 2753 deletions

File diff suppressed because it is too large Load Diff

View File

@ -183,17 +183,15 @@ Priority order used by :func:`infer_schema`:
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
``DOUBLE PRECISION``.
Type inference scans the whole file by default (``TYPE_INFERENCE_SAMPLE_ROWS
= None``) so type + nullability are both computed against every row. The CLI
materializes the file once for schema inference, then re-streams it chunk by
chunk into ``COPY``; peak memory is roughly one full dataframe. Override
``TYPE_INFERENCE_SAMPLE_ROWS`` to an integer cap if you're on a host that
can't hold the file in memory - but know that sampled specs carry the usual
risks: a later row may exceed the inferred integer range, or a column that
had no nulls in the preview may carry nulls later in the file (which then
detonates ``COPY`` because the sampled spec stamped it ``NOT NULL``). Seen
in production on a 2.5M-row file with ~6k null MAFIDs past the 10k-row
preview - the entire load aborted mid-stream.
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
performance on large files. The CLI enforces this at read time via
:func:`read_sas_preview`, so the whole file is never materialized just to pick
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
or a column had no nulls in the preview but does later in the file, ``COPY``
will fail mid-stream and the whole transaction rolls back. Set
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
matters more than speed.
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
commit each chunk as it is copied so an interrupted load retains the rows
@ -227,48 +225,16 @@ import math
import os
import re
import sys
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd
import psycopg2
import psycopg2.extensions
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyreadstat
import yaml
from dotenv import load_dotenv
from pandas.errors import PerformanceWarning
from tqdm import tqdm
# ``_prepare_for_copy`` builds its output frame one column at a time with
# ``out[name] = ...``. On wide SAS files (~100+ columns) pandas prints a
# ``PerformanceWarning: DataFrame is highly fragmented`` once per chunk to
# nudge callers toward ``pd.concat(axis=1, ...)``. The fragmentation only
# matters for row-oriented ops or in-place ``.copy()``; we hand the frame
# straight to ``pyarrow.Table.from_pandas`` which reads columns
# independently, so the warning is pure noise for our pipeline. Filter it
# at import time - narrow category match so nothing else is suppressed.
warnings.filterwarnings("ignore", category=PerformanceWarning)
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
# wrapped around several internal ops (most painfully, the multiply inside
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
# Our data routinely carries ``inf`` / huge sentinels, which trip that
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
# pre-masking the obvious cases, other code paths (pandas object-dtype
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
# Setting a process-wide ``seterr`` is a heavier hammer than an
# ``errstate`` block but survives library internals that don't explicitly
# rewrap it. Downside: a real overflow bug in new code would now silently
# produce inf/nan instead of raising - acceptable for a bulk loader where
# "don't crash on bad rows, null them and move on" is the whole point.
np.seterr(over="ignore", invalid="ignore", divide="ignore")
logger = logging.getLogger(__name__)
@ -289,29 +255,17 @@ values; too small a sample is easy to mis-infer."""
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
"""INTEGER bounds; anything outside becomes BIGINT."""
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = None
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
"""Cap on rows inspected during per-column type inference. Also governs how
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
schema-inference flows.
schema-inference flows. Set to ``None`` to scan every row (and read the whole
file into memory for the preview step - don't do this on multi-hundred-million
row files)."""
Default is ``None`` (scan every row, reading the whole file into memory for
the schema-inference step). That's the only honest setting for nullability:
any integer cap lets a column look ``NOT NULL`` across the first N rows
while the file actually holds rare nulls past the window, which then
detonates ``COPY`` mid-stream (seen in production on a 2.5M-row file where
~6k MAFIDs were null past the 10k-row preview). If you're loading a file
so large that a full read won't fit in memory, set this to an integer cap
and accept that sampled specs can't be trusted for ``NOT NULL``."""
DEFAULT_CHUNK_ROWS = 2_000_000
DEFAULT_CHUNK_ROWS = 100_000
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
fewer COPY round-trips and lower per-row overhead but more peak memory per
chunk; smaller values are gentler on memory.
The chunk size can be overridden at runtime via the
``GENERIC_LOADER_CHUNK_ROWS`` environment variable (read inside
:func:`iter_sas_chunks`), so ``.env``-driven overrides work without code
changes. Explicit ``chunksize=`` kwargs still win over both."""
fewer COPY round-trips but more peak memory per chunk; smaller values are
gentler on memory."""
VALID_IF_EXISTS = ("fail", "replace", "append")
@ -336,8 +290,6 @@ class LoaderConfig:
partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
@dataclass
@ -548,48 +500,6 @@ def load_config(path: Path) -> LoaderConfig:
f"{missing_in_include}"
)
# -- column_types -------------------------------------------------------
# Optional ``{column_name: pg_type}`` escape hatch that bypasses
# automatic type inference for specific columns. Useful when
# pyreadstat reports a column as NUM but the downstream consumer
# expects TEXT (e.g. phone-id columns), or when a column has drifted
# between CHAR and NUM across file versions and you want to pin
# TEXT up front. See also :func:`infer_schema`.
raw_ct = raw.get("column_types")
column_types: Dict[str, str] = {}
if raw_ct is not None:
if not isinstance(raw_ct, dict):
raise ValueError(
f"Config {path}: 'column_types' must be a mapping of "
f"{{column_name: postgres_type}}."
)
for k, v in raw_ct.items():
key = str(k).strip()
if not key:
raise ValueError(
f"Config {path}: 'column_types' contains an empty "
f"column name."
)
if not isinstance(v, str) or not v.strip():
raise ValueError(
f"Config {path}: 'column_types[{key}]' must be a "
f"non-empty Postgres type string (got {v!r})."
)
column_types[key] = v.strip()
# -- all_nullable -------------------------------------------------------
# When inference wrongly stamps a column NOT NULL (sampled rows happened
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
# if set.
raw_an = raw.get("all_nullable", False)
if not isinstance(raw_an, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
)
all_nullable = bool(raw_an)
return LoaderConfig(
filename=filename,
schemaname=schemaname,
@ -600,8 +510,6 @@ def load_config(path: Path) -> LoaderConfig:
partition_by=partition_by,
max_partitions=max_partitions,
indexes=indexes,
column_types=column_types,
all_nullable=all_nullable,
)
@ -655,46 +563,16 @@ def read_sas_preview(
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
def read_sas_metadata(path: Path) -> Any:
"""Read only the metadata (no rows) from a SAS file.
Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes
the file header (column names, formats, total row count, etc.) and
returns without touching the data pages. Orders of magnitude faster
than :func:`read_sas_preview` when all you need is
``meta.number_rows`` - typically a few ms per sas7bdat file, which
makes it cheap to pre-scan a whole folder to populate a global
progress bar.
"""
reader, kwargs = _sas_reader(path)
_, meta = reader(str(Path(path)), metadataonly=True, **kwargs)
return meta
def iter_sas_chunks(
path: Path,
*,
chunksize: Optional[int] = None,
chunksize: int = DEFAULT_CHUNK_ROWS,
):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
underlying reader by extension and threads through our encoding defaults.
When ``chunksize`` is ``None`` (the default), the effective value comes
from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
always wins.
"""
if chunksize is None:
raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS")
if raw is not None:
try:
chunksize = int(raw)
except ValueError:
chunksize = DEFAULT_CHUNK_ROWS
else:
chunksize = DEFAULT_CHUNK_ROWS
reader, kwargs = _sas_reader(path)
yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs
@ -712,13 +590,7 @@ def apply_column_filter(
exclude: Optional[List[str]],
) -> pd.DataFrame:
"""Restrict ``df`` to the requested columns. Names missing from the frame
raise a clear error rather than silently dropping.
Returns the input frame (or a column-sliced view / drop result) without
an extra ``.copy()`` downstream (:func:`_prepare_for_copy`) reads the
frame into a freshly built output and never mutates its input, so the
copies were pure overhead on every streamed chunk.
"""
raise a clear error rather than silently dropping."""
if include is not None and exclude is not None:
raise ValueError("include and exclude are mutually exclusive.")
@ -726,15 +598,15 @@ def apply_column_filter(
missing = [c for c in include if c not in df.columns]
if missing:
raise ValueError(f"include references unknown columns: {missing}")
return df.loc[:, list(include)]
return df.loc[:, list(include)].copy()
if exclude is not None:
missing = [c for c in exclude if c not in df.columns]
if missing:
raise ValueError(f"exclude references unknown columns: {missing}")
return df.drop(columns=list(exclude))
return df.drop(columns=list(exclude)).copy()
return df
return df.copy()
# ---------------------------------------------------------------------------
@ -762,126 +634,6 @@ def _format_driven_type(sas_format: Optional[str]) -> Optional[str]:
return None
_DECIMAL_FORMAT_RE = re.compile(r"\.(\d+)")
def _format_hints_decimal(sas_format: Optional[str]) -> bool:
"""True if a numeric SAS format string explicitly carries decimal places.
SAS numeric formats are ``NAMEw.d``; ``d > 0`` means the variable was
intended to render with ``d`` decimal digits (COMMA10.2, F8.3, ...).
A bare width like ``BEST12.`` or ``F8.`` has no digits after the dot
and is treated as integer-presenting. Used by
:func:`union_column_types` to pick BIGINT vs DOUBLE PRECISION when a
column is numeric in every file of a cluster.
"""
if not sas_format:
return False
m = _DECIMAL_FORMAT_RE.search(sas_format)
if not m:
return False
try:
return int(m.group(1)) > 0
except ValueError:
return False
def extract_union_metadata(
meta: Any,
) -> Dict[str, Tuple[str, Optional[str]]]:
"""Pull the (readstat_type, sas_format) pair for every column in ``meta``.
Returns a plain dict that's safe to pass between processes and to
:func:`union_column_types`. ``readstat_type`` is the simplified type
reported by pyreadstat: ``"string"`` for SAS CHAR, ``"double"`` for
SAS NUM. ``sas_format`` comes from ``meta.original_variable_types``
and drives date/datetime detection during union.
"""
var_types = dict(getattr(meta, "variable_types", None) or {})
formats = dict(getattr(meta, "original_variable_types", None) or {})
names = list(
getattr(meta, "column_names", None)
or list(var_types.keys())
or list(formats.keys())
)
out: Dict[str, Tuple[str, Optional[str]]] = {}
for col in names:
rtype = str(var_types.get(col, "")) if var_types else ""
fmt = formats.get(col)
out[col] = (rtype, fmt if fmt else None)
return out
def union_column_types(
per_file_metas: Iterable[Dict[str, Tuple[str, Optional[str]]]],
) -> Dict[str, str]:
"""Derive one Postgres type per column that's safe across every file.
``per_file_metas`` is an iterable (one entry per file in a cluster) of
``{column_name: (readstat_type, sas_format)}`` dicts as produced by
:func:`extract_union_metadata`.
Rules, evaluated per column:
* **CHAR/NUM drift wins TEXT.** If any file stores the column as CHAR
(``readstat_type != "double"``) the union is ``TEXT``. This covers
the phone-id case where some years stored ``RESP_PH_PREFIX_ID`` as
CHAR and others as NUM.
* **All NUM, format hints DATETIME TIMESTAMP.** Any file whose
format resolves to ``TIMESTAMP`` (via :func:`_format_driven_type`)
pins the column to ``TIMESTAMP`` even if other files left the
format blank.
* **All NUM, format hints DATE DATE.** Same idea for date-only
formats.
* **All NUM, any decimal hint DOUBLE PRECISION.** A ``w.d`` format
with ``d > 0`` in any file implies fractional values somewhere.
* **All NUM, no useful hint DOUBLE PRECISION.** SAS numeric
formats are *display* formats, not storage constraints - a
``BEST12.`` / ``F8.`` / blank-format column can still hold floats,
and pyreadstat hands back plain ``float64`` regardless. Defaulting
to ``DOUBLE PRECISION`` here costs the same 8 bytes as ``BIGINT``
but can't fail on real data. For columns that truly are
integer-only and you want ``BIGINT`` semantics in queries, pin
them via a ``column_types`` override.
Columns missing from a given file are simply skipped for that file;
the union is computed over whichever files *did* supply the column.
Columns that never appear anywhere are omitted from the result.
"""
per_col: Dict[str, List[Tuple[str, Optional[str]]]] = {}
for meta in per_file_metas:
for col, pair in meta.items():
per_col.setdefault(col, []).append(pair)
result: Dict[str, str] = {}
for col, entries in per_col.items():
any_char = any(
rtype and rtype.lower() != "double" for rtype, _ in entries
)
if any_char:
result[col] = "TEXT"
continue
formats = [fmt for _, fmt in entries if fmt]
driven = [_format_driven_type(f) for f in formats]
if "TIMESTAMP" in driven:
result[col] = "TIMESTAMP"
elif "DATE" in driven:
result[col] = "DATE"
else:
# Safe default: DOUBLE PRECISION. The BIGINT default we tried
# first failed the moment a file contained a fractional
# value in a column whose format didn't carry a decimal
# hint (very common: SAS ``BEST12.`` / ``F8.`` are display
# formats, not storage constraints, so the underlying
# 8-byte float can hold any value). Same storage cost as
# BIGINT, handles both integer- and float-valued data, and
# keeps loads from failing mid-cluster. Use a
# ``column_types`` override to pin specific columns to
# ``BIGINT`` when you want integer semantics in queries.
result[col] = "DOUBLE PRECISION"
return result
def _all_null(series: pd.Series) -> bool:
if pd.api.types.is_object_dtype(series):
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
@ -1007,8 +759,6 @@ def infer_schema(
*,
coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None,
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``.
@ -1024,30 +774,11 @@ def infer_schema(
``total_rows`` lets callers who already sampled the frame (e.g. via
:func:`read_sas_preview`) report the real file size in the per-column
"inferred from first N of M rows" note. Falls back to ``len(df)``.
``column_types`` is an optional map ``{column_name: pg_type_str}``
whose entries bypass inference entirely - the caller has already
decided the type (e.g. via :func:`union_column_types` across a
cluster, or a YAML ``column_types`` override). Nullability is still
computed from the data. Columns in ``column_types`` that don't exist
in ``df`` are ignored so a shared override dict can apply to clusters
with different column sets.
``force_nullable=True`` stamps every column nullable regardless of
what the data sample shows. Escape hatch for when inference marks a
column ``NOT NULL`` because the sampled rows happened to be dense but
downstream files carry nulls in that column - common with cluster
loads where one file's preview can't speak for the rest. Cheaper than
trying to sharpen the sampler: widen the column and move on.
"""
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
# When ``TYPE_INFERENCE_SAMPLE_ROWS`` is an integer cap, row-walking type
# probes run on the head slice for speed; nullability and the all-null
# check still walk every row of ``df``. That's only honest when the
# caller handed us the full file - with the default cap of ``None`` the
# CLI does exactly that. Callers who pass a partial preview and a tight
# integer cap accept that ``NOT NULL`` can be wrong for rare-null columns.
# Row-walking type probes run on a bounded head slice; nullability and the
# all-null check still see every row so NOT NULL declarations stay honest.
df_rows = len(df)
effective_total = total_rows if total_rows is not None else df_rows
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
@ -1058,8 +789,6 @@ def infer_schema(
sample_size = df_rows
sampled = sample_size < effective_total
overrides: Dict[str, str] = dict(column_types or {})
# Temporarily flip the module-level flag if the caller asked us to.
global COERCE_CHAR_COLUMNS
saved = COERCE_CHAR_COLUMNS
@ -1072,27 +801,6 @@ def infer_schema(
sas_format = original_formats.get(col)
notes: List[str] = []
if col in overrides:
pg_type = overrides[col]
notes.append(
f"type forced to {pg_type} via column_types override"
)
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
out[col] = ColumnSpec(
name=col,
postgres_type=pg_type,
nullable=nullable,
sas_format=sas_format,
source_dtype=str(series.dtype),
notes=notes,
sampled=sampled,
)
continue
pg_type = _format_driven_type(sas_format)
if pg_type is None:
@ -1123,11 +831,7 @@ def infer_schema(
f"{effective_total:,} rows"
)
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
nullable = _is_nullable(series)
out[col] = ColumnSpec(
name=col,
@ -1260,30 +964,6 @@ def _normalize_type(pg_type: str) -> str:
return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
# Widening pairs: (inferred_from_source, existing_in_target). When the
# incoming spec is narrower than the target we accept it - the value is
# guaranteed to fit, and ``_prepare_for_copy`` already emits ``COPY``
# payloads that Postgres silently promotes to the wider column type. The
# INVERSE direction stays a hard failure: a BIGINT value does not fit in
# an INTEGER column, so we must not let a cluster whose first file had
# only small ints accept a later file with a value past int32. Comes up
# most often on cluster loads where file 1 pushed the target to BIGINT
# (a single value > 2_147_483_647) and file N happens to sit entirely
# within int32 range - strict equality would reject file N even though
# the copy is trivially safe.
_WIDENING_COMPATIBLE: set = {
("smallint", "integer"),
("smallint", "bigint"),
("integer", "bigint"),
("real", "double precision"),
# INTEGER / BIGINT into DOUBLE PRECISION is lossless for int32 and
# exact up to 2**53 for int64, which covers every value pandas could
# have carried through as Int64 without wrapping anyway.
("integer", "double precision"),
("bigint", "double precision"),
}
def _assert_schema_compatible(
conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
) -> None:
@ -1310,22 +990,11 @@ def _assert_schema_compatible(
inferred_norm = _normalize_type(spec.postgres_type)
target_norm = _normalize_type(target_type)
if inferred_norm != target_norm:
if (inferred_norm, target_norm) in _WIDENING_COMPATIBLE:
# Narrower inferred type fits inside the wider target.
# Accept silently-but-noisily so the operator knows the
# file came in with a smaller range than the cluster's
# target was sized for.
warnings.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(narrower than target {target_type}); accepting - "
f"values fit in the wider target type"
)
else:
mismatches.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(normalized {inferred_norm!r}) but target is {target_type} "
f"(normalized {target_norm!r})"
)
mismatches.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(normalized {inferred_norm!r}) but target is {target_type} "
f"(normalized {target_norm!r})"
)
target_is_notnull = (target_nullable == "NO")
if spec.nullable and target_is_notnull:
warnings.append(
@ -1992,151 +1661,9 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
return dt.time(h, m, s)
# Safe outer bound for the numeric->datetime conversion below. The true
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
# and (b) staying well inside the float64 + datetime64[ns] windows gives
# pandas' internals zero room to trip the ``over="raise"`` they wrap
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
_SAS_DATETIME_SAFE_S = 7_500_000_000
_SAS_DATETIME_SAFE_D = 87_000
def _safe_numeric_to_datetime(
series: pd.Series,
*,
unit: str,
column_name: str,
target_type: str,
) -> pd.Series:
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
one stray cell take down the worker.
Failure modes seen in production:
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
sentinels, divide-by-zero in the source, uninitialized cells).
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
overflows float64.
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
seconds) where the nanosecond multiply silently produces garbage or
overflows int64.
All of these trigger ``FloatingPointError: overflow encountered in multiply``
inside ``pd.to_datetime`` because pandas wraps the multiply in
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
gets a chance to turn the bad value into ``NaT``.
Strategy, belt + suspenders + airbag:
1. Coerce to float64 up front. Object-dtype branches hand us mixed
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
and NaNs the rest, so we hit the rest of this function with a
pristine float series.
2. Mask non-finite values and anything outside the safe epoch window to
NaN *before* ``pd.to_datetime`` sees them.
3. Run the conversion under a permissive ``errstate``.
4. If that still raises (some pandas version internally re-enables
``over="raise"`` in a way ``errstate`` can't override), catch it
and return all-NaT for the column with a loud warning. Better a
NULL column in one chunk than a dead worker + no diagnostics.
Emits one stderr line per chunk per affected column so silent data
loss doesn't sneak by.
"""
if not pd.api.types.is_float_dtype(series):
series = pd.to_numeric(series, errors="coerce").astype("float64")
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
if unit == "s":
bound = _SAS_DATETIME_SAFE_S
elif unit == "D":
bound = _SAS_DATETIME_SAFE_D
else:
bound = _SAS_DATETIME_SAFE_S
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
finite_mask = np.isfinite(arr)
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
# to ``bound``, so ``in_range_mask`` already excludes non-finite
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
# in case a future numpy changes that semantic.
in_range_mask = np.abs(arr) < bound
keep_mask = finite_mask & in_range_mask
was_present = ~np.isnan(arr)
coerced = int(((~keep_mask) & was_present).sum())
if coerced:
tqdm.write(
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
f"coerced to NULL",
file=sys.stderr,
)
cleaned_arr = np.where(keep_mask, arr, np.nan)
cleaned = pd.Series(cleaned_arr, index=series.index)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
)
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk. This usually means one "
f"or more values slipped past the pre-mask (bound={bound}). "
f"Consider setting the column to TEXT via column_types if this "
f"recurs.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _safe_object_to_datetime(
series: pd.Series,
*,
column_name: str,
target_type: str,
) -> pd.Series:
"""Object-dtype to datetime. Shares the safety net (errstate +
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
actually numeric-flavored (e.g. SAS wrote numbers into an object
column), route to the numeric path; otherwise parse with ``to_datetime``
on the object itself.
"""
coerced = series.replace({"": None})
numeric = pd.to_numeric(coerced, errors="coerce")
all_numeric = numeric.notna().sum() == coerced.notna().sum()
if all_numeric and coerced.notna().any():
return _safe_numeric_to_datetime(
numeric, unit="s", column_name=column_name, target_type=target_type,
)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(coerced, errors="coerce")
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
"""Materialize a copy of ``df`` with each column in the right shape for
``to_csv`` so the CSV lands as valid input for the target Postgres type.
Per-column conversions are vectorized (``.astype`` / ``pd.to_datetime`` /
``.mask`` / ``.fillna``) instead of the element-wise ``.map(func)``
loops this function used to run. That was the single largest per-chunk
CPU cost on text-heavy loads - a 40-column × 100k-row chunk was issuing
~4M Python-level function calls just to cast strings. TIME columns are
still the ``.map`` path because SAS TIME8 is stored as seconds and the
clamp-to-24h logic doesn't fit cleanly in vector form; they're also
rare in practice.
"""
out = pd.DataFrame(index=df.index)
for name, spec in columns.items():
@ -2159,86 +1686,59 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series.dt.date
elif pd.api.types.is_object_dtype(series):
# Vectorized parse: empty strings / None / unparseable -> NaT,
# then .dt.date yields date objects or NaT. NaT serializes as
# an empty CSV field (matching ``NULL ''`` in COPY). Routed
# through ``_safe_object_to_datetime`` so an object column
# that actually contains SAS-epoch numerics (seen when one
# file of a cluster stores the column as NUM and another as
# CHAR + the union flipped it to TEXT-then-DATE) can't trip
# the overflow-in-multiply bug.
parsed = _safe_object_to_datetime(
series, column_name=name, target_type="DATE",
)
out[name] = parsed.dt.date
elif pd.api.types.is_numeric_dtype(series):
# pyreadstat couldn't decode the SAS format (some
# ``DATEw.``/``YYMMDDw.`` variants and all custom formats slip
# through) so the column came back as float64: days since
# 1960-01-01, the SAS epoch. Without this branch the raw
# number would hit COPY and Postgres rejects it with
# ``invalid input syntax for type date``.
parsed = _safe_numeric_to_datetime(
series, unit="D", column_name=name, target_type="DATE",
)
out[name] = parsed.dt.date
def _to_date(v: Any) -> Optional[dt.date]:
if v is None or (isinstance(v, float) and pd.isna(v)):
return None
if isinstance(v, dt.datetime):
return v.date()
if isinstance(v, dt.date):
return v
if isinstance(v, str):
if v == "":
return None
try:
return dt.date.fromisoformat(v)
except ValueError:
return None
return None
out[name] = series.map(_to_date)
else:
out[name] = series
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series
elif pd.api.types.is_object_dtype(series):
# Same rationale as the DATE object branch above: route
# through the safety net so numeric-flavored object columns
# can't blow us up during the ns multiply.
out[name] = _safe_object_to_datetime(
series, column_name=name, target_type="TIMESTAMP",
)
elif pd.api.types.is_numeric_dtype(series):
# Same story as the DATE branch above, but SAS datetimes are
# *seconds* since 1960-01-01 (fractional seconds for
# ``DATETIMEw.d``). Example caught in the wild:
# ``1915465463.615`` -> 2020-09-13 05:44:23.615.
out[name] = _safe_numeric_to_datetime(
series, unit="s", column_name=name, target_type="TIMESTAMP",
)
def _to_dt(v: Any) -> Optional[dt.datetime]:
if v is None or (isinstance(v, float) and pd.isna(v)):
return None
if isinstance(v, dt.datetime):
return v
if isinstance(v, dt.date):
return dt.datetime(v.year, v.month, v.day)
if isinstance(v, pd.Timestamp):
return v.to_pydatetime() if not pd.isna(v) else None
if isinstance(v, str):
if v == "":
return None
try:
return dt.datetime.fromisoformat(v)
except ValueError:
return None
return None
out[name] = series.map(_to_dt)
else:
out[name] = series
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
out[name] = series.map(_seconds_to_time)
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
# Render every cell as a string and blank out nulls. ``NULL ''``
# in the COPY statement turns the blanks back into SQL NULL.
# astype(str) stringifies NaN/None to the literal "nan"/"None",
# so we mask those after the fact rather than branching per cell.
na_mask = series.isna()
if pd.api.types.is_numeric_dtype(series):
# Hit when a column was auto-unioned to TEXT because at
# least one file of the cluster stored it as CHAR but this
# particular file stored it as NUM (typical of SAS phone-id
# columns). Default float formatting would emit "123.0" -
# which doesn't match the plain "123" coming from the CHAR
# files. When the whole chunk is integer-valued, round to
# int before stringifying; when any fractional value is
# present we leave float formatting alone so we don't
# silently drop precision.
nonnull = series.dropna()
int_like = False
if not nonnull.empty:
try:
int_like = bool(((nonnull % 1) == 0).all())
except TypeError:
int_like = False
if int_like:
# ``Int64`` preserves NA; ``.astype(str)`` renders NA
# as '<NA>', which we then mask out alongside original
# NaNs.
as_str = series.astype("Int64").astype(str)
out[name] = as_str.mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
# Leave empty strings as "" so `NULL ''` in COPY turns them into NULL.
def _to_str(v: Any) -> Any:
if v is None:
return ""
if isinstance(v, float) and pd.isna(v):
return ""
return str(v)
out[name] = series.map(_to_str)
elif pg == "BOOLEAN":
out[name] = series.astype("boolean") if series.dtype != object else series
else:
@ -2246,25 +1746,6 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
return out
def _serialize_chunk_csv(prepared: pd.DataFrame) -> io.BytesIO:
"""Serialize a prepared frame into a CSV buffer for ``COPY FROM STDIN``.
Uses ``pyarrow.csv.write_csv`` (typically 5-10× faster than pandas'
pure-Python ``to_csv`` on wide/text-heavy frames). Null cells serialize
as empty strings and date/timestamp values land in ISO 8601 form, both
of which Postgres accepts under ``FORMAT csv, NULL ''``.
"""
table = pa.Table.from_pandas(prepared, preserve_index=False)
buf = io.BytesIO()
pa_csv.write_csv(
table,
buf,
write_options=pa_csv.WriteOptions(include_header=False),
)
buf.seek(0)
return buf
def copy_dataframes(
conn,
schema_name: str,
@ -2287,43 +1768,23 @@ def copy_dataframes(
)
total = 0
# Pull chunks one at a time so each ``df`` is unreferenced before the
# generator reads the next one. Without this the loop-variable binding
# of a ``for df in dfs:`` keeps the previous chunk alive during the
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
# once COPY has consumed it. Matters most in parallel mode where
# 32 × per-worker peak can exhaust a 128 GB host.
dfs_iter = iter(dfs)
with conn.cursor() as cur:
while True:
try:
df = next(dfs_iter)
except StopIteration:
break
for df in dfs:
if df.empty:
del df
continue
prepared = _prepare_for_copy(df, columns)
del df
n = len(prepared)
buf = _serialize_chunk_csv(prepared)
del prepared
buf = io.StringIO()
prepared.to_csv(
buf,
index=False,
header=False,
na_rep="",
date_format="%Y-%m-%d %H:%M:%S",
)
buf.seek(0)
cur.copy_expert(sql, buf)
del buf
conn.commit()
total += n
# Hand pyarrow's pool memory back between chunks. Without this,
# arrow's internal buffer pool keeps the high-water bytes
# reserved across the worker's lifetime - inside long-running
# workers this presents as steadily climbing RSS even with the
# ``del``s above. Cheap (microseconds); call it every chunk.
try:
pa.default_memory_pool().release_unused()
except Exception:
pass
total += len(prepared)
return total
@ -2438,16 +1899,6 @@ def _build_argparser() -> argparse.ArgumentParser:
"PGUSER / PGPASSWORD from the environment or .env file."
),
)
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference. Use when sampled rows wrongly suggest a "
"column has no nulls. Overrides ``all_nullable`` in the YAML "
"config when set."
),
)
return p
@ -2470,23 +1921,13 @@ def main(argv: Optional[List[str]] = None) -> int:
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
return 2
# Schema inference reads the whole file so type + nullability are
# computed against every row. That's what the target host has the
# resources for and is the only way to honestly emit ``NOT NULL`` -
# a bounded preview routinely missed the ~0.2% of rows with nulls on
# otherwise-dense keys (e.g. MAFID). If you're on a box that can't
# fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to
# an integer cap and know that sampled specs may stamp ``NOT NULL``
# on columns whose nulls live past the window.
# Schema inference uses a bounded preview read so we never load a
# hundreds-of-millions-of-rows file into memory just to pick types.
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
# returned, not the file's total, so we don't trust it here.
preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
force_nullable = args.all_nullable or cfg.all_nullable
columns = infer_schema(
preview_df,
meta,
column_types=cfg.column_types,
force_nullable=force_nullable,
)
columns = infer_schema(preview_df, meta)
# Validate partition columns exist in the schema after filtering.
if cfg.partition_by:
@ -2577,24 +2018,13 @@ def main(argv: Optional[List[str]] = None) -> int:
# it while we're holding a Postgres transaction open.
del preview_df
total_rows = getattr(meta, "number_rows", None)
def _filtered_chunks():
pbar = tqdm(
total=total_rows,
unit="row",
unit_scale=True,
desc=f" {cfg.filename.name}",
file=sys.stderr,
dynamic_ncols=True,
)
try:
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
pbar.update(len(chunk_df))
yield chunk_df
finally:
pbar.close()
seen = 0
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
seen += len(chunk_df)
print(f" streaming... {seen:,} rows", file=sys.stderr)
yield chunk_df
db_user = db_password = None
if args.dbcreds:

View File

@ -38,24 +38,3 @@ if_exists: append
# indexes:
# - state
# - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -61,52 +61,15 @@ auto_detect: true
# - state
# - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly.
#
# `tablename` is required. `if_exists`, `include`, `exclude`, and
# `column_types` are optional per-cluster overrides of the folder-level
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
# `tablename` is required. `if_exists`, `include`, and `exclude` are
# optional per-cluster overrides of the folder-level defaults above.
clusters:
- pattern: '^group_a\d+\.xpt$'
tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace":

View File

@ -1,10 +1,7 @@
pandas>=2.0,<3.0
pyreadstat>=1.2,<2.0
numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

File diff suppressed because it is too large Load Diff