Compare commits
23 Commits
e48038f3c6
...
64e7ff0b0a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64e7ff0b0a | ||
|
|
eff82c73ce | ||
|
|
c283b42876 | ||
|
|
a46f0518f6 | ||
|
|
969a442775 | ||
|
|
212218fb67 | ||
|
|
ae65140390 | ||
|
|
0c5e6e31f0 | ||
|
|
9afb52aecb | ||
|
|
eac75cbb26 | ||
|
|
1265489276 | ||
|
|
2dd247b067 | ||
|
|
052fb0e087 | ||
|
|
fe7dc4d5a1 | ||
|
|
96f2d6fe79 | ||
|
|
7beb44ac4d | ||
|
|
5e347f50ef | ||
|
|
f84e127796 | ||
|
|
a94ab68f4d | ||
|
|
4fc85081c8 | ||
|
|
5449a25b44 | ||
|
|
b3b968edf2 | ||
|
|
f1af1136dc |
File diff suppressed because it is too large
Load Diff
@ -183,15 +183,17 @@ Priority order used by :func:`infer_schema`:
|
||||
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
|
||||
``DOUBLE PRECISION``.
|
||||
|
||||
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
|
||||
performance on large files. The CLI enforces this at read time via
|
||||
:func:`read_sas_preview`, so the whole file is never materialized just to pick
|
||||
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
|
||||
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
|
||||
or a column had no nulls in the preview but does later in the file, ``COPY``
|
||||
will fail mid-stream and the whole transaction rolls back. Set
|
||||
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
|
||||
matters more than speed.
|
||||
Type inference scans the whole file by default (``TYPE_INFERENCE_SAMPLE_ROWS
|
||||
= None``) so type + nullability are both computed against every row. The CLI
|
||||
materializes the file once for schema inference, then re-streams it chunk by
|
||||
chunk into ``COPY``; peak memory is roughly one full dataframe. Override
|
||||
``TYPE_INFERENCE_SAMPLE_ROWS`` to an integer cap if you're on a host that
|
||||
can't hold the file in memory - but know that sampled specs carry the usual
|
||||
risks: a later row may exceed the inferred integer range, or a column that
|
||||
had no nulls in the preview may carry nulls later in the file (which then
|
||||
detonates ``COPY`` because the sampled spec stamped it ``NOT NULL``). Seen
|
||||
in production on a 2.5M-row file with ~6k null MAFIDs past the 10k-row
|
||||
preview - the entire load aborted mid-stream.
|
||||
|
||||
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
|
||||
commit each chunk as it is copied so an interrupted load retains the rows
|
||||
@ -225,16 +227,48 @@ import math
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import warnings
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
import pyarrow as pa
|
||||
import pyarrow.csv as pa_csv
|
||||
import pyreadstat
|
||||
import yaml
|
||||
from dotenv import load_dotenv
|
||||
from pandas.errors import PerformanceWarning
|
||||
from tqdm import tqdm
|
||||
|
||||
# ``_prepare_for_copy`` builds its output frame one column at a time with
|
||||
# ``out[name] = ...``. On wide SAS files (~100+ columns) pandas prints a
|
||||
# ``PerformanceWarning: DataFrame is highly fragmented`` once per chunk to
|
||||
# nudge callers toward ``pd.concat(axis=1, ...)``. The fragmentation only
|
||||
# matters for row-oriented ops or in-place ``.copy()``; we hand the frame
|
||||
# straight to ``pyarrow.Table.from_pandas`` which reads columns
|
||||
# independently, so the warning is pure noise for our pipeline. Filter it
|
||||
# at import time - narrow category match so nothing else is suppressed.
|
||||
warnings.filterwarnings("ignore", category=PerformanceWarning)
|
||||
|
||||
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
|
||||
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
|
||||
# wrapped around several internal ops (most painfully, the multiply inside
|
||||
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
|
||||
# Our data routinely carries ``inf`` / huge sentinels, which trip that
|
||||
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
|
||||
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
|
||||
# pre-masking the obvious cases, other code paths (pandas object-dtype
|
||||
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
|
||||
# Setting a process-wide ``seterr`` is a heavier hammer than an
|
||||
# ``errstate`` block but survives library internals that don't explicitly
|
||||
# rewrap it. Downside: a real overflow bug in new code would now silently
|
||||
# produce inf/nan instead of raising - acceptable for a bulk loader where
|
||||
# "don't crash on bad rows, null them and move on" is the whole point.
|
||||
np.seterr(over="ignore", invalid="ignore", divide="ignore")
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -255,17 +289,29 @@ values; too small a sample is easy to mis-infer."""
|
||||
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
||||
"""INTEGER bounds; anything outside becomes BIGINT."""
|
||||
|
||||
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
|
||||
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = None
|
||||
"""Cap on rows inspected during per-column type inference. Also governs how
|
||||
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
|
||||
schema-inference flows. Set to ``None`` to scan every row (and read the whole
|
||||
file into memory for the preview step - don't do this on multi-hundred-million
|
||||
row files)."""
|
||||
schema-inference flows.
|
||||
|
||||
DEFAULT_CHUNK_ROWS = 100_000
|
||||
Default is ``None`` (scan every row, reading the whole file into memory for
|
||||
the schema-inference step). That's the only honest setting for nullability:
|
||||
any integer cap lets a column look ``NOT NULL`` across the first N rows
|
||||
while the file actually holds rare nulls past the window, which then
|
||||
detonates ``COPY`` mid-stream (seen in production on a 2.5M-row file where
|
||||
~6k MAFIDs were null past the 10k-row preview). If you're loading a file
|
||||
so large that a full read won't fit in memory, set this to an integer cap
|
||||
and accept that sampled specs can't be trusted for ``NOT NULL``."""
|
||||
|
||||
DEFAULT_CHUNK_ROWS = 2_000_000
|
||||
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
|
||||
fewer COPY round-trips but more peak memory per chunk; smaller values are
|
||||
gentler on memory."""
|
||||
fewer COPY round-trips and lower per-row overhead but more peak memory per
|
||||
chunk; smaller values are gentler on memory.
|
||||
|
||||
The chunk size can be overridden at runtime via the
|
||||
``GENERIC_LOADER_CHUNK_ROWS`` environment variable (read inside
|
||||
:func:`iter_sas_chunks`), so ``.env``-driven overrides work without code
|
||||
changes. Explicit ``chunksize=`` kwargs still win over both."""
|
||||
|
||||
|
||||
VALID_IF_EXISTS = ("fail", "replace", "append")
|
||||
@ -290,6 +336,8 @@ class LoaderConfig:
|
||||
partition_by: List[str] = field(default_factory=list)
|
||||
max_partitions: int = 10_000
|
||||
indexes: List[str] = field(default_factory=list)
|
||||
column_types: Dict[str, str] = field(default_factory=dict)
|
||||
all_nullable: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -500,6 +548,48 @@ def load_config(path: Path) -> LoaderConfig:
|
||||
f"{missing_in_include}"
|
||||
)
|
||||
|
||||
# -- column_types -------------------------------------------------------
|
||||
# Optional ``{column_name: pg_type}`` escape hatch that bypasses
|
||||
# automatic type inference for specific columns. Useful when
|
||||
# pyreadstat reports a column as NUM but the downstream consumer
|
||||
# expects TEXT (e.g. phone-id columns), or when a column has drifted
|
||||
# between CHAR and NUM across file versions and you want to pin
|
||||
# TEXT up front. See also :func:`infer_schema`.
|
||||
raw_ct = raw.get("column_types")
|
||||
column_types: Dict[str, str] = {}
|
||||
if raw_ct is not None:
|
||||
if not isinstance(raw_ct, dict):
|
||||
raise ValueError(
|
||||
f"Config {path}: 'column_types' must be a mapping of "
|
||||
f"{{column_name: postgres_type}}."
|
||||
)
|
||||
for k, v in raw_ct.items():
|
||||
key = str(k).strip()
|
||||
if not key:
|
||||
raise ValueError(
|
||||
f"Config {path}: 'column_types' contains an empty "
|
||||
f"column name."
|
||||
)
|
||||
if not isinstance(v, str) or not v.strip():
|
||||
raise ValueError(
|
||||
f"Config {path}: 'column_types[{key}]' must be a "
|
||||
f"non-empty Postgres type string (got {v!r})."
|
||||
)
|
||||
column_types[key] = v.strip()
|
||||
|
||||
# -- all_nullable -------------------------------------------------------
|
||||
# When inference wrongly stamps a column NOT NULL (sampled rows happened
|
||||
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
|
||||
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
|
||||
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
|
||||
# if set.
|
||||
raw_an = raw.get("all_nullable", False)
|
||||
if not isinstance(raw_an, bool):
|
||||
raise ValueError(
|
||||
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
|
||||
)
|
||||
all_nullable = bool(raw_an)
|
||||
|
||||
return LoaderConfig(
|
||||
filename=filename,
|
||||
schemaname=schemaname,
|
||||
@ -510,6 +600,8 @@ def load_config(path: Path) -> LoaderConfig:
|
||||
partition_by=partition_by,
|
||||
max_partitions=max_partitions,
|
||||
indexes=indexes,
|
||||
column_types=column_types,
|
||||
all_nullable=all_nullable,
|
||||
)
|
||||
|
||||
|
||||
@ -563,16 +655,46 @@ def read_sas_preview(
|
||||
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
|
||||
|
||||
|
||||
def read_sas_metadata(path: Path) -> Any:
|
||||
"""Read only the metadata (no rows) from a SAS file.
|
||||
|
||||
Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes
|
||||
the file header (column names, formats, total row count, etc.) and
|
||||
returns without touching the data pages. Orders of magnitude faster
|
||||
than :func:`read_sas_preview` when all you need is
|
||||
``meta.number_rows`` - typically a few ms per sas7bdat file, which
|
||||
makes it cheap to pre-scan a whole folder to populate a global
|
||||
progress bar.
|
||||
"""
|
||||
reader, kwargs = _sas_reader(path)
|
||||
_, meta = reader(str(Path(path)), metadataonly=True, **kwargs)
|
||||
return meta
|
||||
|
||||
|
||||
def iter_sas_chunks(
|
||||
path: Path,
|
||||
*,
|
||||
chunksize: int = DEFAULT_CHUNK_ROWS,
|
||||
chunksize: Optional[int] = None,
|
||||
):
|
||||
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
|
||||
|
||||
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
|
||||
underlying reader by extension and threads through our encoding defaults.
|
||||
|
||||
When ``chunksize`` is ``None`` (the default), the effective value comes
|
||||
from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and
|
||||
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
|
||||
always wins.
|
||||
"""
|
||||
if chunksize is None:
|
||||
raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS")
|
||||
if raw is not None:
|
||||
try:
|
||||
chunksize = int(raw)
|
||||
except ValueError:
|
||||
chunksize = DEFAULT_CHUNK_ROWS
|
||||
else:
|
||||
chunksize = DEFAULT_CHUNK_ROWS
|
||||
reader, kwargs = _sas_reader(path)
|
||||
yield from pyreadstat.read_file_in_chunks(
|
||||
reader, str(Path(path)), chunksize=chunksize, **kwargs
|
||||
@ -590,7 +712,13 @@ def apply_column_filter(
|
||||
exclude: Optional[List[str]],
|
||||
) -> pd.DataFrame:
|
||||
"""Restrict ``df`` to the requested columns. Names missing from the frame
|
||||
raise a clear error rather than silently dropping."""
|
||||
raise a clear error rather than silently dropping.
|
||||
|
||||
Returns the input frame (or a column-sliced view / drop result) without
|
||||
an extra ``.copy()`` — downstream (:func:`_prepare_for_copy`) reads the
|
||||
frame into a freshly built output and never mutates its input, so the
|
||||
copies were pure overhead on every streamed chunk.
|
||||
"""
|
||||
if include is not None and exclude is not None:
|
||||
raise ValueError("include and exclude are mutually exclusive.")
|
||||
|
||||
@ -598,15 +726,15 @@ def apply_column_filter(
|
||||
missing = [c for c in include if c not in df.columns]
|
||||
if missing:
|
||||
raise ValueError(f"include references unknown columns: {missing}")
|
||||
return df.loc[:, list(include)].copy()
|
||||
return df.loc[:, list(include)]
|
||||
|
||||
if exclude is not None:
|
||||
missing = [c for c in exclude if c not in df.columns]
|
||||
if missing:
|
||||
raise ValueError(f"exclude references unknown columns: {missing}")
|
||||
return df.drop(columns=list(exclude)).copy()
|
||||
return df.drop(columns=list(exclude))
|
||||
|
||||
return df.copy()
|
||||
return df
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -634,6 +762,126 @@ def _format_driven_type(sas_format: Optional[str]) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
_DECIMAL_FORMAT_RE = re.compile(r"\.(\d+)")
|
||||
|
||||
|
||||
def _format_hints_decimal(sas_format: Optional[str]) -> bool:
|
||||
"""True if a numeric SAS format string explicitly carries decimal places.
|
||||
|
||||
SAS numeric formats are ``NAMEw.d``; ``d > 0`` means the variable was
|
||||
intended to render with ``d`` decimal digits (COMMA10.2, F8.3, ...).
|
||||
A bare width like ``BEST12.`` or ``F8.`` has no digits after the dot
|
||||
and is treated as integer-presenting. Used by
|
||||
:func:`union_column_types` to pick BIGINT vs DOUBLE PRECISION when a
|
||||
column is numeric in every file of a cluster.
|
||||
"""
|
||||
if not sas_format:
|
||||
return False
|
||||
m = _DECIMAL_FORMAT_RE.search(sas_format)
|
||||
if not m:
|
||||
return False
|
||||
try:
|
||||
return int(m.group(1)) > 0
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def extract_union_metadata(
|
||||
meta: Any,
|
||||
) -> Dict[str, Tuple[str, Optional[str]]]:
|
||||
"""Pull the (readstat_type, sas_format) pair for every column in ``meta``.
|
||||
|
||||
Returns a plain dict that's safe to pass between processes and to
|
||||
:func:`union_column_types`. ``readstat_type`` is the simplified type
|
||||
reported by pyreadstat: ``"string"`` for SAS CHAR, ``"double"`` for
|
||||
SAS NUM. ``sas_format`` comes from ``meta.original_variable_types``
|
||||
and drives date/datetime detection during union.
|
||||
"""
|
||||
var_types = dict(getattr(meta, "variable_types", None) or {})
|
||||
formats = dict(getattr(meta, "original_variable_types", None) or {})
|
||||
names = list(
|
||||
getattr(meta, "column_names", None)
|
||||
or list(var_types.keys())
|
||||
or list(formats.keys())
|
||||
)
|
||||
out: Dict[str, Tuple[str, Optional[str]]] = {}
|
||||
for col in names:
|
||||
rtype = str(var_types.get(col, "")) if var_types else ""
|
||||
fmt = formats.get(col)
|
||||
out[col] = (rtype, fmt if fmt else None)
|
||||
return out
|
||||
|
||||
|
||||
def union_column_types(
|
||||
per_file_metas: Iterable[Dict[str, Tuple[str, Optional[str]]]],
|
||||
) -> Dict[str, str]:
|
||||
"""Derive one Postgres type per column that's safe across every file.
|
||||
|
||||
``per_file_metas`` is an iterable (one entry per file in a cluster) of
|
||||
``{column_name: (readstat_type, sas_format)}`` dicts as produced by
|
||||
:func:`extract_union_metadata`.
|
||||
|
||||
Rules, evaluated per column:
|
||||
|
||||
* **CHAR/NUM drift wins TEXT.** If any file stores the column as CHAR
|
||||
(``readstat_type != "double"``) the union is ``TEXT``. This covers
|
||||
the phone-id case where some years stored ``RESP_PH_PREFIX_ID`` as
|
||||
CHAR and others as NUM.
|
||||
* **All NUM, format hints DATETIME → TIMESTAMP.** Any file whose
|
||||
format resolves to ``TIMESTAMP`` (via :func:`_format_driven_type`)
|
||||
pins the column to ``TIMESTAMP`` even if other files left the
|
||||
format blank.
|
||||
* **All NUM, format hints DATE → DATE.** Same idea for date-only
|
||||
formats.
|
||||
* **All NUM, any decimal hint → DOUBLE PRECISION.** A ``w.d`` format
|
||||
with ``d > 0`` in any file implies fractional values somewhere.
|
||||
* **All NUM, no useful hint → DOUBLE PRECISION.** SAS numeric
|
||||
formats are *display* formats, not storage constraints - a
|
||||
``BEST12.`` / ``F8.`` / blank-format column can still hold floats,
|
||||
and pyreadstat hands back plain ``float64`` regardless. Defaulting
|
||||
to ``DOUBLE PRECISION`` here costs the same 8 bytes as ``BIGINT``
|
||||
but can't fail on real data. For columns that truly are
|
||||
integer-only and you want ``BIGINT`` semantics in queries, pin
|
||||
them via a ``column_types`` override.
|
||||
|
||||
Columns missing from a given file are simply skipped for that file;
|
||||
the union is computed over whichever files *did* supply the column.
|
||||
Columns that never appear anywhere are omitted from the result.
|
||||
"""
|
||||
per_col: Dict[str, List[Tuple[str, Optional[str]]]] = {}
|
||||
for meta in per_file_metas:
|
||||
for col, pair in meta.items():
|
||||
per_col.setdefault(col, []).append(pair)
|
||||
|
||||
result: Dict[str, str] = {}
|
||||
for col, entries in per_col.items():
|
||||
any_char = any(
|
||||
rtype and rtype.lower() != "double" for rtype, _ in entries
|
||||
)
|
||||
if any_char:
|
||||
result[col] = "TEXT"
|
||||
continue
|
||||
formats = [fmt for _, fmt in entries if fmt]
|
||||
driven = [_format_driven_type(f) for f in formats]
|
||||
if "TIMESTAMP" in driven:
|
||||
result[col] = "TIMESTAMP"
|
||||
elif "DATE" in driven:
|
||||
result[col] = "DATE"
|
||||
else:
|
||||
# Safe default: DOUBLE PRECISION. The BIGINT default we tried
|
||||
# first failed the moment a file contained a fractional
|
||||
# value in a column whose format didn't carry a decimal
|
||||
# hint (very common: SAS ``BEST12.`` / ``F8.`` are display
|
||||
# formats, not storage constraints, so the underlying
|
||||
# 8-byte float can hold any value). Same storage cost as
|
||||
# BIGINT, handles both integer- and float-valued data, and
|
||||
# keeps loads from failing mid-cluster. Use a
|
||||
# ``column_types`` override to pin specific columns to
|
||||
# ``BIGINT`` when you want integer semantics in queries.
|
||||
result[col] = "DOUBLE PRECISION"
|
||||
return result
|
||||
|
||||
|
||||
def _all_null(series: pd.Series) -> bool:
|
||||
if pd.api.types.is_object_dtype(series):
|
||||
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
|
||||
@ -759,6 +1007,8 @@ def infer_schema(
|
||||
*,
|
||||
coerce_chars: bool = COERCE_CHAR_COLUMNS,
|
||||
total_rows: Optional[int] = None,
|
||||
column_types: Optional[Dict[str, str]] = None,
|
||||
force_nullable: bool = False,
|
||||
) -> Dict[str, ColumnSpec]:
|
||||
"""Infer a Postgres column spec for each column in ``df``.
|
||||
|
||||
@ -774,11 +1024,30 @@ def infer_schema(
|
||||
``total_rows`` lets callers who already sampled the frame (e.g. via
|
||||
:func:`read_sas_preview`) report the real file size in the per-column
|
||||
"inferred from first N of M rows" note. Falls back to ``len(df)``.
|
||||
|
||||
``column_types`` is an optional map ``{column_name: pg_type_str}``
|
||||
whose entries bypass inference entirely - the caller has already
|
||||
decided the type (e.g. via :func:`union_column_types` across a
|
||||
cluster, or a YAML ``column_types`` override). Nullability is still
|
||||
computed from the data. Columns in ``column_types`` that don't exist
|
||||
in ``df`` are ignored so a shared override dict can apply to clusters
|
||||
with different column sets.
|
||||
|
||||
``force_nullable=True`` stamps every column nullable regardless of
|
||||
what the data sample shows. Escape hatch for when inference marks a
|
||||
column ``NOT NULL`` because the sampled rows happened to be dense but
|
||||
downstream files carry nulls in that column - common with cluster
|
||||
loads where one file's preview can't speak for the rest. Cheaper than
|
||||
trying to sharpen the sampler: widen the column and move on.
|
||||
"""
|
||||
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
||||
|
||||
# Row-walking type probes run on a bounded head slice; nullability and the
|
||||
# all-null check still see every row so NOT NULL declarations stay honest.
|
||||
# When ``TYPE_INFERENCE_SAMPLE_ROWS`` is an integer cap, row-walking type
|
||||
# probes run on the head slice for speed; nullability and the all-null
|
||||
# check still walk every row of ``df``. That's only honest when the
|
||||
# caller handed us the full file - with the default cap of ``None`` the
|
||||
# CLI does exactly that. Callers who pass a partial preview and a tight
|
||||
# integer cap accept that ``NOT NULL`` can be wrong for rare-null columns.
|
||||
df_rows = len(df)
|
||||
effective_total = total_rows if total_rows is not None else df_rows
|
||||
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
|
||||
@ -789,6 +1058,8 @@ def infer_schema(
|
||||
sample_size = df_rows
|
||||
sampled = sample_size < effective_total
|
||||
|
||||
overrides: Dict[str, str] = dict(column_types or {})
|
||||
|
||||
# Temporarily flip the module-level flag if the caller asked us to.
|
||||
global COERCE_CHAR_COLUMNS
|
||||
saved = COERCE_CHAR_COLUMNS
|
||||
@ -801,6 +1072,27 @@ def infer_schema(
|
||||
sas_format = original_formats.get(col)
|
||||
notes: List[str] = []
|
||||
|
||||
if col in overrides:
|
||||
pg_type = overrides[col]
|
||||
notes.append(
|
||||
f"type forced to {pg_type} via column_types override"
|
||||
)
|
||||
if force_nullable:
|
||||
nullable = True
|
||||
notes.append("nullable forced via --all-nullable")
|
||||
else:
|
||||
nullable = _is_nullable(series)
|
||||
out[col] = ColumnSpec(
|
||||
name=col,
|
||||
postgres_type=pg_type,
|
||||
nullable=nullable,
|
||||
sas_format=sas_format,
|
||||
source_dtype=str(series.dtype),
|
||||
notes=notes,
|
||||
sampled=sampled,
|
||||
)
|
||||
continue
|
||||
|
||||
pg_type = _format_driven_type(sas_format)
|
||||
|
||||
if pg_type is None:
|
||||
@ -831,7 +1123,11 @@ def infer_schema(
|
||||
f"{effective_total:,} rows"
|
||||
)
|
||||
|
||||
nullable = _is_nullable(series)
|
||||
if force_nullable:
|
||||
nullable = True
|
||||
notes.append("nullable forced via --all-nullable")
|
||||
else:
|
||||
nullable = _is_nullable(series)
|
||||
|
||||
out[col] = ColumnSpec(
|
||||
name=col,
|
||||
@ -964,6 +1260,30 @@ def _normalize_type(pg_type: str) -> str:
|
||||
return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
|
||||
|
||||
|
||||
# Widening pairs: (inferred_from_source, existing_in_target). When the
|
||||
# incoming spec is narrower than the target we accept it - the value is
|
||||
# guaranteed to fit, and ``_prepare_for_copy`` already emits ``COPY``
|
||||
# payloads that Postgres silently promotes to the wider column type. The
|
||||
# INVERSE direction stays a hard failure: a BIGINT value does not fit in
|
||||
# an INTEGER column, so we must not let a cluster whose first file had
|
||||
# only small ints accept a later file with a value past int32. Comes up
|
||||
# most often on cluster loads where file 1 pushed the target to BIGINT
|
||||
# (a single value > 2_147_483_647) and file N happens to sit entirely
|
||||
# within int32 range - strict equality would reject file N even though
|
||||
# the copy is trivially safe.
|
||||
_WIDENING_COMPATIBLE: set = {
|
||||
("smallint", "integer"),
|
||||
("smallint", "bigint"),
|
||||
("integer", "bigint"),
|
||||
("real", "double precision"),
|
||||
# INTEGER / BIGINT into DOUBLE PRECISION is lossless for int32 and
|
||||
# exact up to 2**53 for int64, which covers every value pandas could
|
||||
# have carried through as Int64 without wrapping anyway.
|
||||
("integer", "double precision"),
|
||||
("bigint", "double precision"),
|
||||
}
|
||||
|
||||
|
||||
def _assert_schema_compatible(
|
||||
conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
|
||||
) -> None:
|
||||
@ -990,11 +1310,22 @@ def _assert_schema_compatible(
|
||||
inferred_norm = _normalize_type(spec.postgres_type)
|
||||
target_norm = _normalize_type(target_type)
|
||||
if inferred_norm != target_norm:
|
||||
mismatches.append(
|
||||
f"column {name!r}: inferred {spec.postgres_type} "
|
||||
f"(normalized {inferred_norm!r}) but target is {target_type} "
|
||||
f"(normalized {target_norm!r})"
|
||||
)
|
||||
if (inferred_norm, target_norm) in _WIDENING_COMPATIBLE:
|
||||
# Narrower inferred type fits inside the wider target.
|
||||
# Accept silently-but-noisily so the operator knows the
|
||||
# file came in with a smaller range than the cluster's
|
||||
# target was sized for.
|
||||
warnings.append(
|
||||
f"column {name!r}: inferred {spec.postgres_type} "
|
||||
f"(narrower than target {target_type}); accepting - "
|
||||
f"values fit in the wider target type"
|
||||
)
|
||||
else:
|
||||
mismatches.append(
|
||||
f"column {name!r}: inferred {spec.postgres_type} "
|
||||
f"(normalized {inferred_norm!r}) but target is {target_type} "
|
||||
f"(normalized {target_norm!r})"
|
||||
)
|
||||
target_is_notnull = (target_nullable == "NO")
|
||||
if spec.nullable and target_is_notnull:
|
||||
warnings.append(
|
||||
@ -1661,9 +1992,151 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
|
||||
return dt.time(h, m, s)
|
||||
|
||||
|
||||
# Safe outer bound for the numeric->datetime conversion below. The true
|
||||
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
|
||||
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
|
||||
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
|
||||
# and (b) staying well inside the float64 + datetime64[ns] windows gives
|
||||
# pandas' internals zero room to trip the ``over="raise"`` they wrap
|
||||
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
|
||||
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
|
||||
_SAS_DATETIME_SAFE_S = 7_500_000_000
|
||||
_SAS_DATETIME_SAFE_D = 87_000
|
||||
|
||||
|
||||
def _safe_numeric_to_datetime(
|
||||
series: pd.Series,
|
||||
*,
|
||||
unit: str,
|
||||
column_name: str,
|
||||
target_type: str,
|
||||
) -> pd.Series:
|
||||
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
|
||||
one stray cell take down the worker.
|
||||
|
||||
Failure modes seen in production:
|
||||
|
||||
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
|
||||
sentinels, divide-by-zero in the source, uninitialized cells).
|
||||
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
|
||||
overflows float64.
|
||||
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
|
||||
seconds) where the nanosecond multiply silently produces garbage or
|
||||
overflows int64.
|
||||
|
||||
All of these trigger ``FloatingPointError: overflow encountered in multiply``
|
||||
inside ``pd.to_datetime`` because pandas wraps the multiply in
|
||||
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
|
||||
gets a chance to turn the bad value into ``NaT``.
|
||||
|
||||
Strategy, belt + suspenders + airbag:
|
||||
|
||||
1. Coerce to float64 up front. Object-dtype branches hand us mixed
|
||||
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
|
||||
and NaNs the rest, so we hit the rest of this function with a
|
||||
pristine float series.
|
||||
2. Mask non-finite values and anything outside the safe epoch window to
|
||||
NaN *before* ``pd.to_datetime`` sees them.
|
||||
3. Run the conversion under a permissive ``errstate``.
|
||||
4. If that still raises (some pandas version internally re-enables
|
||||
``over="raise"`` in a way ``errstate`` can't override), catch it
|
||||
and return all-NaT for the column with a loud warning. Better a
|
||||
NULL column in one chunk than a dead worker + no diagnostics.
|
||||
|
||||
Emits one stderr line per chunk per affected column so silent data
|
||||
loss doesn't sneak by.
|
||||
"""
|
||||
if not pd.api.types.is_float_dtype(series):
|
||||
series = pd.to_numeric(series, errors="coerce").astype("float64")
|
||||
|
||||
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
|
||||
if unit == "s":
|
||||
bound = _SAS_DATETIME_SAFE_S
|
||||
elif unit == "D":
|
||||
bound = _SAS_DATETIME_SAFE_D
|
||||
else:
|
||||
bound = _SAS_DATETIME_SAFE_S
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
finite_mask = np.isfinite(arr)
|
||||
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
|
||||
# to ``bound``, so ``in_range_mask`` already excludes non-finite
|
||||
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
|
||||
# in case a future numpy changes that semantic.
|
||||
in_range_mask = np.abs(arr) < bound
|
||||
keep_mask = finite_mask & in_range_mask
|
||||
was_present = ~np.isnan(arr)
|
||||
coerced = int(((~keep_mask) & was_present).sum())
|
||||
if coerced:
|
||||
tqdm.write(
|
||||
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
|
||||
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
|
||||
f"coerced to NULL",
|
||||
file=sys.stderr,
|
||||
)
|
||||
cleaned_arr = np.where(keep_mask, arr, np.nan)
|
||||
cleaned = pd.Series(cleaned_arr, index=series.index)
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(
|
||||
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
|
||||
)
|
||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
||||
tqdm.write(
|
||||
f"[error] {target_type} column {column_name!r}: "
|
||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
||||
f"returning NaT for the entire chunk. This usually means one "
|
||||
f"or more values slipped past the pre-mask (bound={bound}). "
|
||||
f"Consider setting the column to TEXT via column_types if this "
|
||||
f"recurs.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
||||
|
||||
|
||||
def _safe_object_to_datetime(
|
||||
series: pd.Series,
|
||||
*,
|
||||
column_name: str,
|
||||
target_type: str,
|
||||
) -> pd.Series:
|
||||
"""Object-dtype to datetime. Shares the safety net (errstate +
|
||||
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
|
||||
actually numeric-flavored (e.g. SAS wrote numbers into an object
|
||||
column), route to the numeric path; otherwise parse with ``to_datetime``
|
||||
on the object itself.
|
||||
"""
|
||||
coerced = series.replace({"": None})
|
||||
numeric = pd.to_numeric(coerced, errors="coerce")
|
||||
all_numeric = numeric.notna().sum() == coerced.notna().sum()
|
||||
if all_numeric and coerced.notna().any():
|
||||
return _safe_numeric_to_datetime(
|
||||
numeric, unit="s", column_name=column_name, target_type=target_type,
|
||||
)
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(coerced, errors="coerce")
|
||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
||||
tqdm.write(
|
||||
f"[error] {target_type} column {column_name!r}: "
|
||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
||||
f"returning NaT for the entire chunk.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
||||
|
||||
|
||||
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
|
||||
"""Materialize a copy of ``df`` with each column in the right shape for
|
||||
``to_csv`` so the CSV lands as valid input for the target Postgres type.
|
||||
|
||||
Per-column conversions are vectorized (``.astype`` / ``pd.to_datetime`` /
|
||||
``.mask`` / ``.fillna``) instead of the element-wise ``.map(func)``
|
||||
loops this function used to run. That was the single largest per-chunk
|
||||
CPU cost on text-heavy loads - a 40-column × 100k-row chunk was issuing
|
||||
~4M Python-level function calls just to cast strings. TIME columns are
|
||||
still the ``.map`` path because SAS TIME8 is stored as seconds and the
|
||||
clamp-to-24h logic doesn't fit cleanly in vector form; they're also
|
||||
rare in practice.
|
||||
"""
|
||||
out = pd.DataFrame(index=df.index)
|
||||
for name, spec in columns.items():
|
||||
@ -1686,59 +2159,86 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
if pd.api.types.is_datetime64_any_dtype(series):
|
||||
out[name] = series.dt.date
|
||||
elif pd.api.types.is_object_dtype(series):
|
||||
def _to_date(v: Any) -> Optional[dt.date]:
|
||||
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||||
return None
|
||||
if isinstance(v, dt.datetime):
|
||||
return v.date()
|
||||
if isinstance(v, dt.date):
|
||||
return v
|
||||
if isinstance(v, str):
|
||||
if v == "":
|
||||
return None
|
||||
try:
|
||||
return dt.date.fromisoformat(v)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
out[name] = series.map(_to_date)
|
||||
# Vectorized parse: empty strings / None / unparseable -> NaT,
|
||||
# then .dt.date yields date objects or NaT. NaT serializes as
|
||||
# an empty CSV field (matching ``NULL ''`` in COPY). Routed
|
||||
# through ``_safe_object_to_datetime`` so an object column
|
||||
# that actually contains SAS-epoch numerics (seen when one
|
||||
# file of a cluster stores the column as NUM and another as
|
||||
# CHAR + the union flipped it to TEXT-then-DATE) can't trip
|
||||
# the overflow-in-multiply bug.
|
||||
parsed = _safe_object_to_datetime(
|
||||
series, column_name=name, target_type="DATE",
|
||||
)
|
||||
out[name] = parsed.dt.date
|
||||
elif pd.api.types.is_numeric_dtype(series):
|
||||
# pyreadstat couldn't decode the SAS format (some
|
||||
# ``DATEw.``/``YYMMDDw.`` variants and all custom formats slip
|
||||
# through) so the column came back as float64: days since
|
||||
# 1960-01-01, the SAS epoch. Without this branch the raw
|
||||
# number would hit COPY and Postgres rejects it with
|
||||
# ``invalid input syntax for type date``.
|
||||
parsed = _safe_numeric_to_datetime(
|
||||
series, unit="D", column_name=name, target_type="DATE",
|
||||
)
|
||||
out[name] = parsed.dt.date
|
||||
else:
|
||||
out[name] = series
|
||||
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
|
||||
if pd.api.types.is_datetime64_any_dtype(series):
|
||||
out[name] = series
|
||||
elif pd.api.types.is_object_dtype(series):
|
||||
def _to_dt(v: Any) -> Optional[dt.datetime]:
|
||||
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||||
return None
|
||||
if isinstance(v, dt.datetime):
|
||||
return v
|
||||
if isinstance(v, dt.date):
|
||||
return dt.datetime(v.year, v.month, v.day)
|
||||
if isinstance(v, pd.Timestamp):
|
||||
return v.to_pydatetime() if not pd.isna(v) else None
|
||||
if isinstance(v, str):
|
||||
if v == "":
|
||||
return None
|
||||
try:
|
||||
return dt.datetime.fromisoformat(v)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
out[name] = series.map(_to_dt)
|
||||
# Same rationale as the DATE object branch above: route
|
||||
# through the safety net so numeric-flavored object columns
|
||||
# can't blow us up during the ns multiply.
|
||||
out[name] = _safe_object_to_datetime(
|
||||
series, column_name=name, target_type="TIMESTAMP",
|
||||
)
|
||||
elif pd.api.types.is_numeric_dtype(series):
|
||||
# Same story as the DATE branch above, but SAS datetimes are
|
||||
# *seconds* since 1960-01-01 (fractional seconds for
|
||||
# ``DATETIMEw.d``). Example caught in the wild:
|
||||
# ``1915465463.615`` -> 2020-09-13 05:44:23.615.
|
||||
out[name] = _safe_numeric_to_datetime(
|
||||
series, unit="s", column_name=name, target_type="TIMESTAMP",
|
||||
)
|
||||
else:
|
||||
out[name] = series
|
||||
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
|
||||
out[name] = series.map(_seconds_to_time)
|
||||
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
|
||||
# Leave empty strings as "" so `NULL ''` in COPY turns them into NULL.
|
||||
def _to_str(v: Any) -> Any:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, float) and pd.isna(v):
|
||||
return ""
|
||||
return str(v)
|
||||
out[name] = series.map(_to_str)
|
||||
# Render every cell as a string and blank out nulls. ``NULL ''``
|
||||
# in the COPY statement turns the blanks back into SQL NULL.
|
||||
# astype(str) stringifies NaN/None to the literal "nan"/"None",
|
||||
# so we mask those after the fact rather than branching per cell.
|
||||
na_mask = series.isna()
|
||||
if pd.api.types.is_numeric_dtype(series):
|
||||
# Hit when a column was auto-unioned to TEXT because at
|
||||
# least one file of the cluster stored it as CHAR but this
|
||||
# particular file stored it as NUM (typical of SAS phone-id
|
||||
# columns). Default float formatting would emit "123.0" -
|
||||
# which doesn't match the plain "123" coming from the CHAR
|
||||
# files. When the whole chunk is integer-valued, round to
|
||||
# int before stringifying; when any fractional value is
|
||||
# present we leave float formatting alone so we don't
|
||||
# silently drop precision.
|
||||
nonnull = series.dropna()
|
||||
int_like = False
|
||||
if not nonnull.empty:
|
||||
try:
|
||||
int_like = bool(((nonnull % 1) == 0).all())
|
||||
except TypeError:
|
||||
int_like = False
|
||||
if int_like:
|
||||
# ``Int64`` preserves NA; ``.astype(str)`` renders NA
|
||||
# as '<NA>', which we then mask out alongside original
|
||||
# NaNs.
|
||||
as_str = series.astype("Int64").astype(str)
|
||||
out[name] = as_str.mask(na_mask, "")
|
||||
else:
|
||||
out[name] = series.astype(str).mask(na_mask, "")
|
||||
else:
|
||||
out[name] = series.astype(str).mask(na_mask, "")
|
||||
elif pg == "BOOLEAN":
|
||||
out[name] = series.astype("boolean") if series.dtype != object else series
|
||||
else:
|
||||
@ -1746,6 +2246,25 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
return out
|
||||
|
||||
|
||||
def _serialize_chunk_csv(prepared: pd.DataFrame) -> io.BytesIO:
|
||||
"""Serialize a prepared frame into a CSV buffer for ``COPY FROM STDIN``.
|
||||
|
||||
Uses ``pyarrow.csv.write_csv`` (typically 5-10× faster than pandas'
|
||||
pure-Python ``to_csv`` on wide/text-heavy frames). Null cells serialize
|
||||
as empty strings and date/timestamp values land in ISO 8601 form, both
|
||||
of which Postgres accepts under ``FORMAT csv, NULL ''``.
|
||||
"""
|
||||
table = pa.Table.from_pandas(prepared, preserve_index=False)
|
||||
buf = io.BytesIO()
|
||||
pa_csv.write_csv(
|
||||
table,
|
||||
buf,
|
||||
write_options=pa_csv.WriteOptions(include_header=False),
|
||||
)
|
||||
buf.seek(0)
|
||||
return buf
|
||||
|
||||
|
||||
def copy_dataframes(
|
||||
conn,
|
||||
schema_name: str,
|
||||
@ -1768,23 +2287,43 @@ def copy_dataframes(
|
||||
)
|
||||
|
||||
total = 0
|
||||
# Pull chunks one at a time so each ``df`` is unreferenced before the
|
||||
# generator reads the next one. Without this the loop-variable binding
|
||||
# of a ``for df in dfs:`` keeps the previous chunk alive during the
|
||||
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
|
||||
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
|
||||
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
|
||||
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
|
||||
# once COPY has consumed it. Matters most in parallel mode where
|
||||
# 32 × per-worker peak can exhaust a 128 GB host.
|
||||
dfs_iter = iter(dfs)
|
||||
with conn.cursor() as cur:
|
||||
for df in dfs:
|
||||
while True:
|
||||
try:
|
||||
df = next(dfs_iter)
|
||||
except StopIteration:
|
||||
break
|
||||
if df.empty:
|
||||
del df
|
||||
continue
|
||||
prepared = _prepare_for_copy(df, columns)
|
||||
buf = io.StringIO()
|
||||
prepared.to_csv(
|
||||
buf,
|
||||
index=False,
|
||||
header=False,
|
||||
na_rep="",
|
||||
date_format="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
buf.seek(0)
|
||||
del df
|
||||
n = len(prepared)
|
||||
buf = _serialize_chunk_csv(prepared)
|
||||
del prepared
|
||||
cur.copy_expert(sql, buf)
|
||||
del buf
|
||||
conn.commit()
|
||||
total += len(prepared)
|
||||
total += n
|
||||
# Hand pyarrow's pool memory back between chunks. Without this,
|
||||
# arrow's internal buffer pool keeps the high-water bytes
|
||||
# reserved across the worker's lifetime - inside long-running
|
||||
# workers this presents as steadily climbing RSS even with the
|
||||
# ``del``s above. Cheap (microseconds); call it every chunk.
|
||||
try:
|
||||
pa.default_memory_pool().release_unused()
|
||||
except Exception:
|
||||
pass
|
||||
return total
|
||||
|
||||
|
||||
@ -1899,6 +2438,16 @@ def _build_argparser() -> argparse.ArgumentParser:
|
||||
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--all-nullable",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Stamp every column nullable in the generated schema, bypassing "
|
||||
"NOT NULL inference. Use when sampled rows wrongly suggest a "
|
||||
"column has no nulls. Overrides ``all_nullable`` in the YAML "
|
||||
"config when set."
|
||||
),
|
||||
)
|
||||
return p
|
||||
|
||||
|
||||
@ -1921,13 +2470,23 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
# Schema inference uses a bounded preview read so we never load a
|
||||
# hundreds-of-millions-of-rows file into memory just to pick types.
|
||||
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
|
||||
# returned, not the file's total, so we don't trust it here.
|
||||
# Schema inference reads the whole file so type + nullability are
|
||||
# computed against every row. That's what the target host has the
|
||||
# resources for and is the only way to honestly emit ``NOT NULL`` -
|
||||
# a bounded preview routinely missed the ~0.2% of rows with nulls on
|
||||
# otherwise-dense keys (e.g. MAFID). If you're on a box that can't
|
||||
# fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to
|
||||
# an integer cap and know that sampled specs may stamp ``NOT NULL``
|
||||
# on columns whose nulls live past the window.
|
||||
preview_df, meta = read_sas_preview(cfg.filename)
|
||||
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
|
||||
columns = infer_schema(preview_df, meta)
|
||||
force_nullable = args.all_nullable or cfg.all_nullable
|
||||
columns = infer_schema(
|
||||
preview_df,
|
||||
meta,
|
||||
column_types=cfg.column_types,
|
||||
force_nullable=force_nullable,
|
||||
)
|
||||
|
||||
# Validate partition columns exist in the schema after filtering.
|
||||
if cfg.partition_by:
|
||||
@ -2018,13 +2577,24 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
# it while we're holding a Postgres transaction open.
|
||||
del preview_df
|
||||
|
||||
total_rows = getattr(meta, "number_rows", None)
|
||||
|
||||
def _filtered_chunks():
|
||||
seen = 0
|
||||
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
|
||||
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
|
||||
seen += len(chunk_df)
|
||||
print(f" streaming... {seen:,} rows", file=sys.stderr)
|
||||
yield chunk_df
|
||||
pbar = tqdm(
|
||||
total=total_rows,
|
||||
unit="row",
|
||||
unit_scale=True,
|
||||
desc=f" {cfg.filename.name}",
|
||||
file=sys.stderr,
|
||||
dynamic_ncols=True,
|
||||
)
|
||||
try:
|
||||
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
|
||||
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
|
||||
pbar.update(len(chunk_df))
|
||||
yield chunk_df
|
||||
finally:
|
||||
pbar.close()
|
||||
|
||||
db_user = db_password = None
|
||||
if args.dbcreds:
|
||||
|
||||
@ -38,3 +38,24 @@ if_exists: append
|
||||
# indexes:
|
||||
# - state
|
||||
# - zip
|
||||
|
||||
# column_types: Explicit {column_name: postgres_type} overrides that
|
||||
# bypass automatic type inference for the listed columns. Useful when
|
||||
# pyreadstat reports a column as NUM but you want it stored as TEXT
|
||||
# (phone/ID columns that are conceptually strings), or when a column's
|
||||
# inferred type is off for any other reason. Columns not listed here
|
||||
# fall through to the normal inference path. Nullability is always
|
||||
# computed from the data.
|
||||
#
|
||||
# column_types:
|
||||
# RESP_PH_PREFIX_ID: TEXT
|
||||
# SOMELONG_ID: BIGINT
|
||||
|
||||
# all_nullable: If true, every column is stamped nullable in the generated
|
||||
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
|
||||
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
|
||||
# rare-null data downstream) and COPY blows up mid-load on the first null
|
||||
# it hits. Off by default. The CLI flag --all-nullable overrides this to
|
||||
# true when set.
|
||||
#
|
||||
# all_nullable: false
|
||||
|
||||
@ -61,15 +61,52 @@ auto_detect: true
|
||||
# - state
|
||||
# - zip
|
||||
|
||||
# Folder-level column_types: Explicit {column_name: postgres_type} map that
|
||||
# bypasses automatic type inference for the listed columns. Applied to
|
||||
# every cluster unless a cluster supplies its own column_types, which are
|
||||
# merged on top (cluster entries win on conflict).
|
||||
#
|
||||
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
|
||||
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
|
||||
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
|
||||
# BIGINT). Entries in column_types here win over that auto-union - use
|
||||
# them when the auto result is wrong or when --no-prescan disables the
|
||||
# auto-union and you still need to pin a column.
|
||||
#
|
||||
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
|
||||
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
|
||||
# don't exist in a given file are simply ignored for that file.
|
||||
#
|
||||
# column_types:
|
||||
# RESP_PH_PREFIX_ID: TEXT
|
||||
# RESP_PH_SUFFIX_ID: TEXT
|
||||
# SOMELONG_ID: BIGINT
|
||||
|
||||
# Folder-level all_nullable: If true, every column of every cluster is
|
||||
# stamped nullable in the generated schema; NOT NULL inference is skipped
|
||||
# entirely. Use this when the sampler wrongly concludes a column has no
|
||||
# nulls (sampled rows happened to be dense, but later files in the cluster
|
||||
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
|
||||
# unless a cluster supplies its own all_nullable. The CLI flag
|
||||
# --all-nullable overrides both this and any per-cluster setting when
|
||||
# passed. Off by default.
|
||||
#
|
||||
# all_nullable: false
|
||||
|
||||
# Explicit cluster patterns. Each pattern is matched against the file
|
||||
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
||||
# pool, so explicit and auto clusters compose cleanly.
|
||||
#
|
||||
# `tablename` is required. `if_exists`, `include`, and `exclude` are
|
||||
# optional per-cluster overrides of the folder-level defaults above.
|
||||
# `tablename` is required. `if_exists`, `include`, `exclude`, and
|
||||
# `column_types` are optional per-cluster overrides of the folder-level
|
||||
# defaults above. Cluster-level column_types entries win over folder-
|
||||
# level entries for the same column.
|
||||
clusters:
|
||||
- pattern: '^group_a\d+\.xpt$'
|
||||
tablename: group_a
|
||||
# column_types:
|
||||
# INTCOL: TEXT
|
||||
# all_nullable: true # per-cluster override of the folder-level default
|
||||
|
||||
# Example of an explicit override. Uncomment to force the group_b cluster to
|
||||
# append instead of replace even though the folder default is "replace":
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
pandas>=2.0,<3.0
|
||||
pyreadstat>=1.2,<2.0
|
||||
numpy>=2.1,<3.0
|
||||
pyarrow>=22.0,<24.0
|
||||
pyyaml>=6.0,<7.0
|
||||
psycopg2-binary>=2.9,<3.0
|
||||
python-dotenv>=1.0,<2.0
|
||||
boto3>=1.28,<2.0
|
||||
openpyxl>=3.1,<4.0
|
||||
tqdm>=4.66,<5.0
|
||||
|
||||
1138
utils/sas_profiler.py
Normal file
1138
utils/sas_profiler.py
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user