Compare commits
No commits in common. "64e7ff0b0a479bb7e7ccbdd568d5bc8aecbc4bd1" and "e48038f3c65471aff1074e0f37b7df8937e305b1" have entirely different histories.
64e7ff0b0a
...
e48038f3c6
File diff suppressed because it is too large
Load Diff
@ -183,17 +183,15 @@ Priority order used by :func:`infer_schema`:
|
|||||||
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
|
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
|
||||||
``DOUBLE PRECISION``.
|
``DOUBLE PRECISION``.
|
||||||
|
|
||||||
Type inference scans the whole file by default (``TYPE_INFERENCE_SAMPLE_ROWS
|
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
|
||||||
= None``) so type + nullability are both computed against every row. The CLI
|
performance on large files. The CLI enforces this at read time via
|
||||||
materializes the file once for schema inference, then re-streams it chunk by
|
:func:`read_sas_preview`, so the whole file is never materialized just to pick
|
||||||
chunk into ``COPY``; peak memory is roughly one full dataframe. Override
|
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
|
||||||
``TYPE_INFERENCE_SAMPLE_ROWS`` to an integer cap if you're on a host that
|
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
|
||||||
can't hold the file in memory - but know that sampled specs carry the usual
|
or a column had no nulls in the preview but does later in the file, ``COPY``
|
||||||
risks: a later row may exceed the inferred integer range, or a column that
|
will fail mid-stream and the whole transaction rolls back. Set
|
||||||
had no nulls in the preview may carry nulls later in the file (which then
|
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
|
||||||
detonates ``COPY`` because the sampled spec stamped it ``NOT NULL``). Seen
|
matters more than speed.
|
||||||
in production on a 2.5M-row file with ~6k null MAFIDs past the 10k-row
|
|
||||||
preview - the entire load aborted mid-stream.
|
|
||||||
|
|
||||||
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
|
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
|
||||||
commit each chunk as it is copied so an interrupted load retains the rows
|
commit each chunk as it is copied so an interrupted load retains the rows
|
||||||
@ -227,48 +225,16 @@ import math
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import warnings
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import psycopg2
|
import psycopg2
|
||||||
import psycopg2.extensions
|
import psycopg2.extensions
|
||||||
import pyarrow as pa
|
|
||||||
import pyarrow.csv as pa_csv
|
|
||||||
import pyreadstat
|
import pyreadstat
|
||||||
import yaml
|
import yaml
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from pandas.errors import PerformanceWarning
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
# ``_prepare_for_copy`` builds its output frame one column at a time with
|
|
||||||
# ``out[name] = ...``. On wide SAS files (~100+ columns) pandas prints a
|
|
||||||
# ``PerformanceWarning: DataFrame is highly fragmented`` once per chunk to
|
|
||||||
# nudge callers toward ``pd.concat(axis=1, ...)``. The fragmentation only
|
|
||||||
# matters for row-oriented ops or in-place ``.copy()``; we hand the frame
|
|
||||||
# straight to ``pyarrow.Table.from_pandas`` which reads columns
|
|
||||||
# independently, so the warning is pure noise for our pipeline. Filter it
|
|
||||||
# at import time - narrow category match so nothing else is suppressed.
|
|
||||||
warnings.filterwarnings("ignore", category=PerformanceWarning)
|
|
||||||
|
|
||||||
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
|
|
||||||
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
|
|
||||||
# wrapped around several internal ops (most painfully, the multiply inside
|
|
||||||
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
|
|
||||||
# Our data routinely carries ``inf`` / huge sentinels, which trip that
|
|
||||||
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
|
|
||||||
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
|
|
||||||
# pre-masking the obvious cases, other code paths (pandas object-dtype
|
|
||||||
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
|
|
||||||
# Setting a process-wide ``seterr`` is a heavier hammer than an
|
|
||||||
# ``errstate`` block but survives library internals that don't explicitly
|
|
||||||
# rewrap it. Downside: a real overflow bug in new code would now silently
|
|
||||||
# produce inf/nan instead of raising - acceptable for a bulk loader where
|
|
||||||
# "don't crash on bad rows, null them and move on" is the whole point.
|
|
||||||
np.seterr(over="ignore", invalid="ignore", divide="ignore")
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -289,29 +255,17 @@ values; too small a sample is easy to mis-infer."""
|
|||||||
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
||||||
"""INTEGER bounds; anything outside becomes BIGINT."""
|
"""INTEGER bounds; anything outside becomes BIGINT."""
|
||||||
|
|
||||||
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = None
|
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
|
||||||
"""Cap on rows inspected during per-column type inference. Also governs how
|
"""Cap on rows inspected during per-column type inference. Also governs how
|
||||||
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
|
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
|
||||||
schema-inference flows.
|
schema-inference flows. Set to ``None`` to scan every row (and read the whole
|
||||||
|
file into memory for the preview step - don't do this on multi-hundred-million
|
||||||
|
row files)."""
|
||||||
|
|
||||||
Default is ``None`` (scan every row, reading the whole file into memory for
|
DEFAULT_CHUNK_ROWS = 100_000
|
||||||
the schema-inference step). That's the only honest setting for nullability:
|
|
||||||
any integer cap lets a column look ``NOT NULL`` across the first N rows
|
|
||||||
while the file actually holds rare nulls past the window, which then
|
|
||||||
detonates ``COPY`` mid-stream (seen in production on a 2.5M-row file where
|
|
||||||
~6k MAFIDs were null past the 10k-row preview). If you're loading a file
|
|
||||||
so large that a full read won't fit in memory, set this to an integer cap
|
|
||||||
and accept that sampled specs can't be trusted for ``NOT NULL``."""
|
|
||||||
|
|
||||||
DEFAULT_CHUNK_ROWS = 2_000_000
|
|
||||||
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
|
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
|
||||||
fewer COPY round-trips and lower per-row overhead but more peak memory per
|
fewer COPY round-trips but more peak memory per chunk; smaller values are
|
||||||
chunk; smaller values are gentler on memory.
|
gentler on memory."""
|
||||||
|
|
||||||
The chunk size can be overridden at runtime via the
|
|
||||||
``GENERIC_LOADER_CHUNK_ROWS`` environment variable (read inside
|
|
||||||
:func:`iter_sas_chunks`), so ``.env``-driven overrides work without code
|
|
||||||
changes. Explicit ``chunksize=`` kwargs still win over both."""
|
|
||||||
|
|
||||||
|
|
||||||
VALID_IF_EXISTS = ("fail", "replace", "append")
|
VALID_IF_EXISTS = ("fail", "replace", "append")
|
||||||
@ -336,8 +290,6 @@ class LoaderConfig:
|
|||||||
partition_by: List[str] = field(default_factory=list)
|
partition_by: List[str] = field(default_factory=list)
|
||||||
max_partitions: int = 10_000
|
max_partitions: int = 10_000
|
||||||
indexes: List[str] = field(default_factory=list)
|
indexes: List[str] = field(default_factory=list)
|
||||||
column_types: Dict[str, str] = field(default_factory=dict)
|
|
||||||
all_nullable: bool = False
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -548,48 +500,6 @@ def load_config(path: Path) -> LoaderConfig:
|
|||||||
f"{missing_in_include}"
|
f"{missing_in_include}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# -- column_types -------------------------------------------------------
|
|
||||||
# Optional ``{column_name: pg_type}`` escape hatch that bypasses
|
|
||||||
# automatic type inference for specific columns. Useful when
|
|
||||||
# pyreadstat reports a column as NUM but the downstream consumer
|
|
||||||
# expects TEXT (e.g. phone-id columns), or when a column has drifted
|
|
||||||
# between CHAR and NUM across file versions and you want to pin
|
|
||||||
# TEXT up front. See also :func:`infer_schema`.
|
|
||||||
raw_ct = raw.get("column_types")
|
|
||||||
column_types: Dict[str, str] = {}
|
|
||||||
if raw_ct is not None:
|
|
||||||
if not isinstance(raw_ct, dict):
|
|
||||||
raise ValueError(
|
|
||||||
f"Config {path}: 'column_types' must be a mapping of "
|
|
||||||
f"{{column_name: postgres_type}}."
|
|
||||||
)
|
|
||||||
for k, v in raw_ct.items():
|
|
||||||
key = str(k).strip()
|
|
||||||
if not key:
|
|
||||||
raise ValueError(
|
|
||||||
f"Config {path}: 'column_types' contains an empty "
|
|
||||||
f"column name."
|
|
||||||
)
|
|
||||||
if not isinstance(v, str) or not v.strip():
|
|
||||||
raise ValueError(
|
|
||||||
f"Config {path}: 'column_types[{key}]' must be a "
|
|
||||||
f"non-empty Postgres type string (got {v!r})."
|
|
||||||
)
|
|
||||||
column_types[key] = v.strip()
|
|
||||||
|
|
||||||
# -- all_nullable -------------------------------------------------------
|
|
||||||
# When inference wrongly stamps a column NOT NULL (sampled rows happened
|
|
||||||
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
|
|
||||||
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
|
|
||||||
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
|
|
||||||
# if set.
|
|
||||||
raw_an = raw.get("all_nullable", False)
|
|
||||||
if not isinstance(raw_an, bool):
|
|
||||||
raise ValueError(
|
|
||||||
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
|
|
||||||
)
|
|
||||||
all_nullable = bool(raw_an)
|
|
||||||
|
|
||||||
return LoaderConfig(
|
return LoaderConfig(
|
||||||
filename=filename,
|
filename=filename,
|
||||||
schemaname=schemaname,
|
schemaname=schemaname,
|
||||||
@ -600,8 +510,6 @@ def load_config(path: Path) -> LoaderConfig:
|
|||||||
partition_by=partition_by,
|
partition_by=partition_by,
|
||||||
max_partitions=max_partitions,
|
max_partitions=max_partitions,
|
||||||
indexes=indexes,
|
indexes=indexes,
|
||||||
column_types=column_types,
|
|
||||||
all_nullable=all_nullable,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -655,46 +563,16 @@ def read_sas_preview(
|
|||||||
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
|
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def read_sas_metadata(path: Path) -> Any:
|
|
||||||
"""Read only the metadata (no rows) from a SAS file.
|
|
||||||
|
|
||||||
Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes
|
|
||||||
the file header (column names, formats, total row count, etc.) and
|
|
||||||
returns without touching the data pages. Orders of magnitude faster
|
|
||||||
than :func:`read_sas_preview` when all you need is
|
|
||||||
``meta.number_rows`` - typically a few ms per sas7bdat file, which
|
|
||||||
makes it cheap to pre-scan a whole folder to populate a global
|
|
||||||
progress bar.
|
|
||||||
"""
|
|
||||||
reader, kwargs = _sas_reader(path)
|
|
||||||
_, meta = reader(str(Path(path)), metadataonly=True, **kwargs)
|
|
||||||
return meta
|
|
||||||
|
|
||||||
|
|
||||||
def iter_sas_chunks(
|
def iter_sas_chunks(
|
||||||
path: Path,
|
path: Path,
|
||||||
*,
|
*,
|
||||||
chunksize: Optional[int] = None,
|
chunksize: int = DEFAULT_CHUNK_ROWS,
|
||||||
):
|
):
|
||||||
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
|
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
|
||||||
|
|
||||||
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
|
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
|
||||||
underlying reader by extension and threads through our encoding defaults.
|
underlying reader by extension and threads through our encoding defaults.
|
||||||
|
|
||||||
When ``chunksize`` is ``None`` (the default), the effective value comes
|
|
||||||
from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and
|
|
||||||
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
|
|
||||||
always wins.
|
|
||||||
"""
|
"""
|
||||||
if chunksize is None:
|
|
||||||
raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS")
|
|
||||||
if raw is not None:
|
|
||||||
try:
|
|
||||||
chunksize = int(raw)
|
|
||||||
except ValueError:
|
|
||||||
chunksize = DEFAULT_CHUNK_ROWS
|
|
||||||
else:
|
|
||||||
chunksize = DEFAULT_CHUNK_ROWS
|
|
||||||
reader, kwargs = _sas_reader(path)
|
reader, kwargs = _sas_reader(path)
|
||||||
yield from pyreadstat.read_file_in_chunks(
|
yield from pyreadstat.read_file_in_chunks(
|
||||||
reader, str(Path(path)), chunksize=chunksize, **kwargs
|
reader, str(Path(path)), chunksize=chunksize, **kwargs
|
||||||
@ -712,13 +590,7 @@ def apply_column_filter(
|
|||||||
exclude: Optional[List[str]],
|
exclude: Optional[List[str]],
|
||||||
) -> pd.DataFrame:
|
) -> pd.DataFrame:
|
||||||
"""Restrict ``df`` to the requested columns. Names missing from the frame
|
"""Restrict ``df`` to the requested columns. Names missing from the frame
|
||||||
raise a clear error rather than silently dropping.
|
raise a clear error rather than silently dropping."""
|
||||||
|
|
||||||
Returns the input frame (or a column-sliced view / drop result) without
|
|
||||||
an extra ``.copy()`` — downstream (:func:`_prepare_for_copy`) reads the
|
|
||||||
frame into a freshly built output and never mutates its input, so the
|
|
||||||
copies were pure overhead on every streamed chunk.
|
|
||||||
"""
|
|
||||||
if include is not None and exclude is not None:
|
if include is not None and exclude is not None:
|
||||||
raise ValueError("include and exclude are mutually exclusive.")
|
raise ValueError("include and exclude are mutually exclusive.")
|
||||||
|
|
||||||
@ -726,15 +598,15 @@ def apply_column_filter(
|
|||||||
missing = [c for c in include if c not in df.columns]
|
missing = [c for c in include if c not in df.columns]
|
||||||
if missing:
|
if missing:
|
||||||
raise ValueError(f"include references unknown columns: {missing}")
|
raise ValueError(f"include references unknown columns: {missing}")
|
||||||
return df.loc[:, list(include)]
|
return df.loc[:, list(include)].copy()
|
||||||
|
|
||||||
if exclude is not None:
|
if exclude is not None:
|
||||||
missing = [c for c in exclude if c not in df.columns]
|
missing = [c for c in exclude if c not in df.columns]
|
||||||
if missing:
|
if missing:
|
||||||
raise ValueError(f"exclude references unknown columns: {missing}")
|
raise ValueError(f"exclude references unknown columns: {missing}")
|
||||||
return df.drop(columns=list(exclude))
|
return df.drop(columns=list(exclude)).copy()
|
||||||
|
|
||||||
return df
|
return df.copy()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -762,126 +634,6 @@ def _format_driven_type(sas_format: Optional[str]) -> Optional[str]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
_DECIMAL_FORMAT_RE = re.compile(r"\.(\d+)")
|
|
||||||
|
|
||||||
|
|
||||||
def _format_hints_decimal(sas_format: Optional[str]) -> bool:
|
|
||||||
"""True if a numeric SAS format string explicitly carries decimal places.
|
|
||||||
|
|
||||||
SAS numeric formats are ``NAMEw.d``; ``d > 0`` means the variable was
|
|
||||||
intended to render with ``d`` decimal digits (COMMA10.2, F8.3, ...).
|
|
||||||
A bare width like ``BEST12.`` or ``F8.`` has no digits after the dot
|
|
||||||
and is treated as integer-presenting. Used by
|
|
||||||
:func:`union_column_types` to pick BIGINT vs DOUBLE PRECISION when a
|
|
||||||
column is numeric in every file of a cluster.
|
|
||||||
"""
|
|
||||||
if not sas_format:
|
|
||||||
return False
|
|
||||||
m = _DECIMAL_FORMAT_RE.search(sas_format)
|
|
||||||
if not m:
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
return int(m.group(1)) > 0
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def extract_union_metadata(
|
|
||||||
meta: Any,
|
|
||||||
) -> Dict[str, Tuple[str, Optional[str]]]:
|
|
||||||
"""Pull the (readstat_type, sas_format) pair for every column in ``meta``.
|
|
||||||
|
|
||||||
Returns a plain dict that's safe to pass between processes and to
|
|
||||||
:func:`union_column_types`. ``readstat_type`` is the simplified type
|
|
||||||
reported by pyreadstat: ``"string"`` for SAS CHAR, ``"double"`` for
|
|
||||||
SAS NUM. ``sas_format`` comes from ``meta.original_variable_types``
|
|
||||||
and drives date/datetime detection during union.
|
|
||||||
"""
|
|
||||||
var_types = dict(getattr(meta, "variable_types", None) or {})
|
|
||||||
formats = dict(getattr(meta, "original_variable_types", None) or {})
|
|
||||||
names = list(
|
|
||||||
getattr(meta, "column_names", None)
|
|
||||||
or list(var_types.keys())
|
|
||||||
or list(formats.keys())
|
|
||||||
)
|
|
||||||
out: Dict[str, Tuple[str, Optional[str]]] = {}
|
|
||||||
for col in names:
|
|
||||||
rtype = str(var_types.get(col, "")) if var_types else ""
|
|
||||||
fmt = formats.get(col)
|
|
||||||
out[col] = (rtype, fmt if fmt else None)
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
def union_column_types(
|
|
||||||
per_file_metas: Iterable[Dict[str, Tuple[str, Optional[str]]]],
|
|
||||||
) -> Dict[str, str]:
|
|
||||||
"""Derive one Postgres type per column that's safe across every file.
|
|
||||||
|
|
||||||
``per_file_metas`` is an iterable (one entry per file in a cluster) of
|
|
||||||
``{column_name: (readstat_type, sas_format)}`` dicts as produced by
|
|
||||||
:func:`extract_union_metadata`.
|
|
||||||
|
|
||||||
Rules, evaluated per column:
|
|
||||||
|
|
||||||
* **CHAR/NUM drift wins TEXT.** If any file stores the column as CHAR
|
|
||||||
(``readstat_type != "double"``) the union is ``TEXT``. This covers
|
|
||||||
the phone-id case where some years stored ``RESP_PH_PREFIX_ID`` as
|
|
||||||
CHAR and others as NUM.
|
|
||||||
* **All NUM, format hints DATETIME → TIMESTAMP.** Any file whose
|
|
||||||
format resolves to ``TIMESTAMP`` (via :func:`_format_driven_type`)
|
|
||||||
pins the column to ``TIMESTAMP`` even if other files left the
|
|
||||||
format blank.
|
|
||||||
* **All NUM, format hints DATE → DATE.** Same idea for date-only
|
|
||||||
formats.
|
|
||||||
* **All NUM, any decimal hint → DOUBLE PRECISION.** A ``w.d`` format
|
|
||||||
with ``d > 0`` in any file implies fractional values somewhere.
|
|
||||||
* **All NUM, no useful hint → DOUBLE PRECISION.** SAS numeric
|
|
||||||
formats are *display* formats, not storage constraints - a
|
|
||||||
``BEST12.`` / ``F8.`` / blank-format column can still hold floats,
|
|
||||||
and pyreadstat hands back plain ``float64`` regardless. Defaulting
|
|
||||||
to ``DOUBLE PRECISION`` here costs the same 8 bytes as ``BIGINT``
|
|
||||||
but can't fail on real data. For columns that truly are
|
|
||||||
integer-only and you want ``BIGINT`` semantics in queries, pin
|
|
||||||
them via a ``column_types`` override.
|
|
||||||
|
|
||||||
Columns missing from a given file are simply skipped for that file;
|
|
||||||
the union is computed over whichever files *did* supply the column.
|
|
||||||
Columns that never appear anywhere are omitted from the result.
|
|
||||||
"""
|
|
||||||
per_col: Dict[str, List[Tuple[str, Optional[str]]]] = {}
|
|
||||||
for meta in per_file_metas:
|
|
||||||
for col, pair in meta.items():
|
|
||||||
per_col.setdefault(col, []).append(pair)
|
|
||||||
|
|
||||||
result: Dict[str, str] = {}
|
|
||||||
for col, entries in per_col.items():
|
|
||||||
any_char = any(
|
|
||||||
rtype and rtype.lower() != "double" for rtype, _ in entries
|
|
||||||
)
|
|
||||||
if any_char:
|
|
||||||
result[col] = "TEXT"
|
|
||||||
continue
|
|
||||||
formats = [fmt for _, fmt in entries if fmt]
|
|
||||||
driven = [_format_driven_type(f) for f in formats]
|
|
||||||
if "TIMESTAMP" in driven:
|
|
||||||
result[col] = "TIMESTAMP"
|
|
||||||
elif "DATE" in driven:
|
|
||||||
result[col] = "DATE"
|
|
||||||
else:
|
|
||||||
# Safe default: DOUBLE PRECISION. The BIGINT default we tried
|
|
||||||
# first failed the moment a file contained a fractional
|
|
||||||
# value in a column whose format didn't carry a decimal
|
|
||||||
# hint (very common: SAS ``BEST12.`` / ``F8.`` are display
|
|
||||||
# formats, not storage constraints, so the underlying
|
|
||||||
# 8-byte float can hold any value). Same storage cost as
|
|
||||||
# BIGINT, handles both integer- and float-valued data, and
|
|
||||||
# keeps loads from failing mid-cluster. Use a
|
|
||||||
# ``column_types`` override to pin specific columns to
|
|
||||||
# ``BIGINT`` when you want integer semantics in queries.
|
|
||||||
result[col] = "DOUBLE PRECISION"
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _all_null(series: pd.Series) -> bool:
|
def _all_null(series: pd.Series) -> bool:
|
||||||
if pd.api.types.is_object_dtype(series):
|
if pd.api.types.is_object_dtype(series):
|
||||||
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
|
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
|
||||||
@ -1007,8 +759,6 @@ def infer_schema(
|
|||||||
*,
|
*,
|
||||||
coerce_chars: bool = COERCE_CHAR_COLUMNS,
|
coerce_chars: bool = COERCE_CHAR_COLUMNS,
|
||||||
total_rows: Optional[int] = None,
|
total_rows: Optional[int] = None,
|
||||||
column_types: Optional[Dict[str, str]] = None,
|
|
||||||
force_nullable: bool = False,
|
|
||||||
) -> Dict[str, ColumnSpec]:
|
) -> Dict[str, ColumnSpec]:
|
||||||
"""Infer a Postgres column spec for each column in ``df``.
|
"""Infer a Postgres column spec for each column in ``df``.
|
||||||
|
|
||||||
@ -1024,30 +774,11 @@ def infer_schema(
|
|||||||
``total_rows`` lets callers who already sampled the frame (e.g. via
|
``total_rows`` lets callers who already sampled the frame (e.g. via
|
||||||
:func:`read_sas_preview`) report the real file size in the per-column
|
:func:`read_sas_preview`) report the real file size in the per-column
|
||||||
"inferred from first N of M rows" note. Falls back to ``len(df)``.
|
"inferred from first N of M rows" note. Falls back to ``len(df)``.
|
||||||
|
|
||||||
``column_types`` is an optional map ``{column_name: pg_type_str}``
|
|
||||||
whose entries bypass inference entirely - the caller has already
|
|
||||||
decided the type (e.g. via :func:`union_column_types` across a
|
|
||||||
cluster, or a YAML ``column_types`` override). Nullability is still
|
|
||||||
computed from the data. Columns in ``column_types`` that don't exist
|
|
||||||
in ``df`` are ignored so a shared override dict can apply to clusters
|
|
||||||
with different column sets.
|
|
||||||
|
|
||||||
``force_nullable=True`` stamps every column nullable regardless of
|
|
||||||
what the data sample shows. Escape hatch for when inference marks a
|
|
||||||
column ``NOT NULL`` because the sampled rows happened to be dense but
|
|
||||||
downstream files carry nulls in that column - common with cluster
|
|
||||||
loads where one file's preview can't speak for the rest. Cheaper than
|
|
||||||
trying to sharpen the sampler: widen the column and move on.
|
|
||||||
"""
|
"""
|
||||||
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
||||||
|
|
||||||
# When ``TYPE_INFERENCE_SAMPLE_ROWS`` is an integer cap, row-walking type
|
# Row-walking type probes run on a bounded head slice; nullability and the
|
||||||
# probes run on the head slice for speed; nullability and the all-null
|
# all-null check still see every row so NOT NULL declarations stay honest.
|
||||||
# check still walk every row of ``df``. That's only honest when the
|
|
||||||
# caller handed us the full file - with the default cap of ``None`` the
|
|
||||||
# CLI does exactly that. Callers who pass a partial preview and a tight
|
|
||||||
# integer cap accept that ``NOT NULL`` can be wrong for rare-null columns.
|
|
||||||
df_rows = len(df)
|
df_rows = len(df)
|
||||||
effective_total = total_rows if total_rows is not None else df_rows
|
effective_total = total_rows if total_rows is not None else df_rows
|
||||||
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
|
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
|
||||||
@ -1058,8 +789,6 @@ def infer_schema(
|
|||||||
sample_size = df_rows
|
sample_size = df_rows
|
||||||
sampled = sample_size < effective_total
|
sampled = sample_size < effective_total
|
||||||
|
|
||||||
overrides: Dict[str, str] = dict(column_types or {})
|
|
||||||
|
|
||||||
# Temporarily flip the module-level flag if the caller asked us to.
|
# Temporarily flip the module-level flag if the caller asked us to.
|
||||||
global COERCE_CHAR_COLUMNS
|
global COERCE_CHAR_COLUMNS
|
||||||
saved = COERCE_CHAR_COLUMNS
|
saved = COERCE_CHAR_COLUMNS
|
||||||
@ -1072,27 +801,6 @@ def infer_schema(
|
|||||||
sas_format = original_formats.get(col)
|
sas_format = original_formats.get(col)
|
||||||
notes: List[str] = []
|
notes: List[str] = []
|
||||||
|
|
||||||
if col in overrides:
|
|
||||||
pg_type = overrides[col]
|
|
||||||
notes.append(
|
|
||||||
f"type forced to {pg_type} via column_types override"
|
|
||||||
)
|
|
||||||
if force_nullable:
|
|
||||||
nullable = True
|
|
||||||
notes.append("nullable forced via --all-nullable")
|
|
||||||
else:
|
|
||||||
nullable = _is_nullable(series)
|
|
||||||
out[col] = ColumnSpec(
|
|
||||||
name=col,
|
|
||||||
postgres_type=pg_type,
|
|
||||||
nullable=nullable,
|
|
||||||
sas_format=sas_format,
|
|
||||||
source_dtype=str(series.dtype),
|
|
||||||
notes=notes,
|
|
||||||
sampled=sampled,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
pg_type = _format_driven_type(sas_format)
|
pg_type = _format_driven_type(sas_format)
|
||||||
|
|
||||||
if pg_type is None:
|
if pg_type is None:
|
||||||
@ -1123,10 +831,6 @@ def infer_schema(
|
|||||||
f"{effective_total:,} rows"
|
f"{effective_total:,} rows"
|
||||||
)
|
)
|
||||||
|
|
||||||
if force_nullable:
|
|
||||||
nullable = True
|
|
||||||
notes.append("nullable forced via --all-nullable")
|
|
||||||
else:
|
|
||||||
nullable = _is_nullable(series)
|
nullable = _is_nullable(series)
|
||||||
|
|
||||||
out[col] = ColumnSpec(
|
out[col] = ColumnSpec(
|
||||||
@ -1260,30 +964,6 @@ def _normalize_type(pg_type: str) -> str:
|
|||||||
return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
|
return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
|
||||||
|
|
||||||
|
|
||||||
# Widening pairs: (inferred_from_source, existing_in_target). When the
|
|
||||||
# incoming spec is narrower than the target we accept it - the value is
|
|
||||||
# guaranteed to fit, and ``_prepare_for_copy`` already emits ``COPY``
|
|
||||||
# payloads that Postgres silently promotes to the wider column type. The
|
|
||||||
# INVERSE direction stays a hard failure: a BIGINT value does not fit in
|
|
||||||
# an INTEGER column, so we must not let a cluster whose first file had
|
|
||||||
# only small ints accept a later file with a value past int32. Comes up
|
|
||||||
# most often on cluster loads where file 1 pushed the target to BIGINT
|
|
||||||
# (a single value > 2_147_483_647) and file N happens to sit entirely
|
|
||||||
# within int32 range - strict equality would reject file N even though
|
|
||||||
# the copy is trivially safe.
|
|
||||||
_WIDENING_COMPATIBLE: set = {
|
|
||||||
("smallint", "integer"),
|
|
||||||
("smallint", "bigint"),
|
|
||||||
("integer", "bigint"),
|
|
||||||
("real", "double precision"),
|
|
||||||
# INTEGER / BIGINT into DOUBLE PRECISION is lossless for int32 and
|
|
||||||
# exact up to 2**53 for int64, which covers every value pandas could
|
|
||||||
# have carried through as Int64 without wrapping anyway.
|
|
||||||
("integer", "double precision"),
|
|
||||||
("bigint", "double precision"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _assert_schema_compatible(
|
def _assert_schema_compatible(
|
||||||
conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
|
conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -1310,17 +990,6 @@ def _assert_schema_compatible(
|
|||||||
inferred_norm = _normalize_type(spec.postgres_type)
|
inferred_norm = _normalize_type(spec.postgres_type)
|
||||||
target_norm = _normalize_type(target_type)
|
target_norm = _normalize_type(target_type)
|
||||||
if inferred_norm != target_norm:
|
if inferred_norm != target_norm:
|
||||||
if (inferred_norm, target_norm) in _WIDENING_COMPATIBLE:
|
|
||||||
# Narrower inferred type fits inside the wider target.
|
|
||||||
# Accept silently-but-noisily so the operator knows the
|
|
||||||
# file came in with a smaller range than the cluster's
|
|
||||||
# target was sized for.
|
|
||||||
warnings.append(
|
|
||||||
f"column {name!r}: inferred {spec.postgres_type} "
|
|
||||||
f"(narrower than target {target_type}); accepting - "
|
|
||||||
f"values fit in the wider target type"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
mismatches.append(
|
mismatches.append(
|
||||||
f"column {name!r}: inferred {spec.postgres_type} "
|
f"column {name!r}: inferred {spec.postgres_type} "
|
||||||
f"(normalized {inferred_norm!r}) but target is {target_type} "
|
f"(normalized {inferred_norm!r}) but target is {target_type} "
|
||||||
@ -1992,151 +1661,9 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
|
|||||||
return dt.time(h, m, s)
|
return dt.time(h, m, s)
|
||||||
|
|
||||||
|
|
||||||
# Safe outer bound for the numeric->datetime conversion below. The true
|
|
||||||
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
|
|
||||||
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
|
|
||||||
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
|
|
||||||
# and (b) staying well inside the float64 + datetime64[ns] windows gives
|
|
||||||
# pandas' internals zero room to trip the ``over="raise"`` they wrap
|
|
||||||
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
|
|
||||||
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
|
|
||||||
_SAS_DATETIME_SAFE_S = 7_500_000_000
|
|
||||||
_SAS_DATETIME_SAFE_D = 87_000
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_numeric_to_datetime(
|
|
||||||
series: pd.Series,
|
|
||||||
*,
|
|
||||||
unit: str,
|
|
||||||
column_name: str,
|
|
||||||
target_type: str,
|
|
||||||
) -> pd.Series:
|
|
||||||
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
|
|
||||||
one stray cell take down the worker.
|
|
||||||
|
|
||||||
Failure modes seen in production:
|
|
||||||
|
|
||||||
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
|
|
||||||
sentinels, divide-by-zero in the source, uninitialized cells).
|
|
||||||
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
|
|
||||||
overflows float64.
|
|
||||||
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
|
|
||||||
seconds) where the nanosecond multiply silently produces garbage or
|
|
||||||
overflows int64.
|
|
||||||
|
|
||||||
All of these trigger ``FloatingPointError: overflow encountered in multiply``
|
|
||||||
inside ``pd.to_datetime`` because pandas wraps the multiply in
|
|
||||||
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
|
|
||||||
gets a chance to turn the bad value into ``NaT``.
|
|
||||||
|
|
||||||
Strategy, belt + suspenders + airbag:
|
|
||||||
|
|
||||||
1. Coerce to float64 up front. Object-dtype branches hand us mixed
|
|
||||||
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
|
|
||||||
and NaNs the rest, so we hit the rest of this function with a
|
|
||||||
pristine float series.
|
|
||||||
2. Mask non-finite values and anything outside the safe epoch window to
|
|
||||||
NaN *before* ``pd.to_datetime`` sees them.
|
|
||||||
3. Run the conversion under a permissive ``errstate``.
|
|
||||||
4. If that still raises (some pandas version internally re-enables
|
|
||||||
``over="raise"`` in a way ``errstate`` can't override), catch it
|
|
||||||
and return all-NaT for the column with a loud warning. Better a
|
|
||||||
NULL column in one chunk than a dead worker + no diagnostics.
|
|
||||||
|
|
||||||
Emits one stderr line per chunk per affected column so silent data
|
|
||||||
loss doesn't sneak by.
|
|
||||||
"""
|
|
||||||
if not pd.api.types.is_float_dtype(series):
|
|
||||||
series = pd.to_numeric(series, errors="coerce").astype("float64")
|
|
||||||
|
|
||||||
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
|
|
||||||
if unit == "s":
|
|
||||||
bound = _SAS_DATETIME_SAFE_S
|
|
||||||
elif unit == "D":
|
|
||||||
bound = _SAS_DATETIME_SAFE_D
|
|
||||||
else:
|
|
||||||
bound = _SAS_DATETIME_SAFE_S
|
|
||||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
|
||||||
finite_mask = np.isfinite(arr)
|
|
||||||
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
|
|
||||||
# to ``bound``, so ``in_range_mask`` already excludes non-finite
|
|
||||||
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
|
|
||||||
# in case a future numpy changes that semantic.
|
|
||||||
in_range_mask = np.abs(arr) < bound
|
|
||||||
keep_mask = finite_mask & in_range_mask
|
|
||||||
was_present = ~np.isnan(arr)
|
|
||||||
coerced = int(((~keep_mask) & was_present).sum())
|
|
||||||
if coerced:
|
|
||||||
tqdm.write(
|
|
||||||
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
|
|
||||||
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
|
|
||||||
f"coerced to NULL",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
cleaned_arr = np.where(keep_mask, arr, np.nan)
|
|
||||||
cleaned = pd.Series(cleaned_arr, index=series.index)
|
|
||||||
try:
|
|
||||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
|
||||||
return pd.to_datetime(
|
|
||||||
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
|
|
||||||
)
|
|
||||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
|
||||||
tqdm.write(
|
|
||||||
f"[error] {target_type} column {column_name!r}: "
|
|
||||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
|
||||||
f"returning NaT for the entire chunk. This usually means one "
|
|
||||||
f"or more values slipped past the pre-mask (bound={bound}). "
|
|
||||||
f"Consider setting the column to TEXT via column_types if this "
|
|
||||||
f"recurs.",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_object_to_datetime(
|
|
||||||
series: pd.Series,
|
|
||||||
*,
|
|
||||||
column_name: str,
|
|
||||||
target_type: str,
|
|
||||||
) -> pd.Series:
|
|
||||||
"""Object-dtype to datetime. Shares the safety net (errstate +
|
|
||||||
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
|
|
||||||
actually numeric-flavored (e.g. SAS wrote numbers into an object
|
|
||||||
column), route to the numeric path; otherwise parse with ``to_datetime``
|
|
||||||
on the object itself.
|
|
||||||
"""
|
|
||||||
coerced = series.replace({"": None})
|
|
||||||
numeric = pd.to_numeric(coerced, errors="coerce")
|
|
||||||
all_numeric = numeric.notna().sum() == coerced.notna().sum()
|
|
||||||
if all_numeric and coerced.notna().any():
|
|
||||||
return _safe_numeric_to_datetime(
|
|
||||||
numeric, unit="s", column_name=column_name, target_type=target_type,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
|
||||||
return pd.to_datetime(coerced, errors="coerce")
|
|
||||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
|
||||||
tqdm.write(
|
|
||||||
f"[error] {target_type} column {column_name!r}: "
|
|
||||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
|
||||||
f"returning NaT for the entire chunk.",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
|
||||||
|
|
||||||
|
|
||||||
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
|
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
|
||||||
"""Materialize a copy of ``df`` with each column in the right shape for
|
"""Materialize a copy of ``df`` with each column in the right shape for
|
||||||
``to_csv`` so the CSV lands as valid input for the target Postgres type.
|
``to_csv`` so the CSV lands as valid input for the target Postgres type.
|
||||||
|
|
||||||
Per-column conversions are vectorized (``.astype`` / ``pd.to_datetime`` /
|
|
||||||
``.mask`` / ``.fillna``) instead of the element-wise ``.map(func)``
|
|
||||||
loops this function used to run. That was the single largest per-chunk
|
|
||||||
CPU cost on text-heavy loads - a 40-column × 100k-row chunk was issuing
|
|
||||||
~4M Python-level function calls just to cast strings. TIME columns are
|
|
||||||
still the ``.map`` path because SAS TIME8 is stored as seconds and the
|
|
||||||
clamp-to-24h logic doesn't fit cleanly in vector form; they're also
|
|
||||||
rare in practice.
|
|
||||||
"""
|
"""
|
||||||
out = pd.DataFrame(index=df.index)
|
out = pd.DataFrame(index=df.index)
|
||||||
for name, spec in columns.items():
|
for name, spec in columns.items():
|
||||||
@ -2159,86 +1686,59 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
|||||||
if pd.api.types.is_datetime64_any_dtype(series):
|
if pd.api.types.is_datetime64_any_dtype(series):
|
||||||
out[name] = series.dt.date
|
out[name] = series.dt.date
|
||||||
elif pd.api.types.is_object_dtype(series):
|
elif pd.api.types.is_object_dtype(series):
|
||||||
# Vectorized parse: empty strings / None / unparseable -> NaT,
|
def _to_date(v: Any) -> Optional[dt.date]:
|
||||||
# then .dt.date yields date objects or NaT. NaT serializes as
|
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||||||
# an empty CSV field (matching ``NULL ''`` in COPY). Routed
|
return None
|
||||||
# through ``_safe_object_to_datetime`` so an object column
|
if isinstance(v, dt.datetime):
|
||||||
# that actually contains SAS-epoch numerics (seen when one
|
return v.date()
|
||||||
# file of a cluster stores the column as NUM and another as
|
if isinstance(v, dt.date):
|
||||||
# CHAR + the union flipped it to TEXT-then-DATE) can't trip
|
return v
|
||||||
# the overflow-in-multiply bug.
|
if isinstance(v, str):
|
||||||
parsed = _safe_object_to_datetime(
|
if v == "":
|
||||||
series, column_name=name, target_type="DATE",
|
return None
|
||||||
)
|
try:
|
||||||
out[name] = parsed.dt.date
|
return dt.date.fromisoformat(v)
|
||||||
elif pd.api.types.is_numeric_dtype(series):
|
except ValueError:
|
||||||
# pyreadstat couldn't decode the SAS format (some
|
return None
|
||||||
# ``DATEw.``/``YYMMDDw.`` variants and all custom formats slip
|
return None
|
||||||
# through) so the column came back as float64: days since
|
out[name] = series.map(_to_date)
|
||||||
# 1960-01-01, the SAS epoch. Without this branch the raw
|
|
||||||
# number would hit COPY and Postgres rejects it with
|
|
||||||
# ``invalid input syntax for type date``.
|
|
||||||
parsed = _safe_numeric_to_datetime(
|
|
||||||
series, unit="D", column_name=name, target_type="DATE",
|
|
||||||
)
|
|
||||||
out[name] = parsed.dt.date
|
|
||||||
else:
|
else:
|
||||||
out[name] = series
|
out[name] = series
|
||||||
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
|
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
|
||||||
if pd.api.types.is_datetime64_any_dtype(series):
|
if pd.api.types.is_datetime64_any_dtype(series):
|
||||||
out[name] = series
|
out[name] = series
|
||||||
elif pd.api.types.is_object_dtype(series):
|
elif pd.api.types.is_object_dtype(series):
|
||||||
# Same rationale as the DATE object branch above: route
|
def _to_dt(v: Any) -> Optional[dt.datetime]:
|
||||||
# through the safety net so numeric-flavored object columns
|
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||||||
# can't blow us up during the ns multiply.
|
return None
|
||||||
out[name] = _safe_object_to_datetime(
|
if isinstance(v, dt.datetime):
|
||||||
series, column_name=name, target_type="TIMESTAMP",
|
return v
|
||||||
)
|
if isinstance(v, dt.date):
|
||||||
elif pd.api.types.is_numeric_dtype(series):
|
return dt.datetime(v.year, v.month, v.day)
|
||||||
# Same story as the DATE branch above, but SAS datetimes are
|
if isinstance(v, pd.Timestamp):
|
||||||
# *seconds* since 1960-01-01 (fractional seconds for
|
return v.to_pydatetime() if not pd.isna(v) else None
|
||||||
# ``DATETIMEw.d``). Example caught in the wild:
|
if isinstance(v, str):
|
||||||
# ``1915465463.615`` -> 2020-09-13 05:44:23.615.
|
if v == "":
|
||||||
out[name] = _safe_numeric_to_datetime(
|
return None
|
||||||
series, unit="s", column_name=name, target_type="TIMESTAMP",
|
try:
|
||||||
)
|
return dt.datetime.fromisoformat(v)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
out[name] = series.map(_to_dt)
|
||||||
else:
|
else:
|
||||||
out[name] = series
|
out[name] = series
|
||||||
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
|
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
|
||||||
out[name] = series.map(_seconds_to_time)
|
out[name] = series.map(_seconds_to_time)
|
||||||
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
|
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
|
||||||
# Render every cell as a string and blank out nulls. ``NULL ''``
|
# Leave empty strings as "" so `NULL ''` in COPY turns them into NULL.
|
||||||
# in the COPY statement turns the blanks back into SQL NULL.
|
def _to_str(v: Any) -> Any:
|
||||||
# astype(str) stringifies NaN/None to the literal "nan"/"None",
|
if v is None:
|
||||||
# so we mask those after the fact rather than branching per cell.
|
return ""
|
||||||
na_mask = series.isna()
|
if isinstance(v, float) and pd.isna(v):
|
||||||
if pd.api.types.is_numeric_dtype(series):
|
return ""
|
||||||
# Hit when a column was auto-unioned to TEXT because at
|
return str(v)
|
||||||
# least one file of the cluster stored it as CHAR but this
|
out[name] = series.map(_to_str)
|
||||||
# particular file stored it as NUM (typical of SAS phone-id
|
|
||||||
# columns). Default float formatting would emit "123.0" -
|
|
||||||
# which doesn't match the plain "123" coming from the CHAR
|
|
||||||
# files. When the whole chunk is integer-valued, round to
|
|
||||||
# int before stringifying; when any fractional value is
|
|
||||||
# present we leave float formatting alone so we don't
|
|
||||||
# silently drop precision.
|
|
||||||
nonnull = series.dropna()
|
|
||||||
int_like = False
|
|
||||||
if not nonnull.empty:
|
|
||||||
try:
|
|
||||||
int_like = bool(((nonnull % 1) == 0).all())
|
|
||||||
except TypeError:
|
|
||||||
int_like = False
|
|
||||||
if int_like:
|
|
||||||
# ``Int64`` preserves NA; ``.astype(str)`` renders NA
|
|
||||||
# as '<NA>', which we then mask out alongside original
|
|
||||||
# NaNs.
|
|
||||||
as_str = series.astype("Int64").astype(str)
|
|
||||||
out[name] = as_str.mask(na_mask, "")
|
|
||||||
else:
|
|
||||||
out[name] = series.astype(str).mask(na_mask, "")
|
|
||||||
else:
|
|
||||||
out[name] = series.astype(str).mask(na_mask, "")
|
|
||||||
elif pg == "BOOLEAN":
|
elif pg == "BOOLEAN":
|
||||||
out[name] = series.astype("boolean") if series.dtype != object else series
|
out[name] = series.astype("boolean") if series.dtype != object else series
|
||||||
else:
|
else:
|
||||||
@ -2246,25 +1746,6 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
def _serialize_chunk_csv(prepared: pd.DataFrame) -> io.BytesIO:
|
|
||||||
"""Serialize a prepared frame into a CSV buffer for ``COPY FROM STDIN``.
|
|
||||||
|
|
||||||
Uses ``pyarrow.csv.write_csv`` (typically 5-10× faster than pandas'
|
|
||||||
pure-Python ``to_csv`` on wide/text-heavy frames). Null cells serialize
|
|
||||||
as empty strings and date/timestamp values land in ISO 8601 form, both
|
|
||||||
of which Postgres accepts under ``FORMAT csv, NULL ''``.
|
|
||||||
"""
|
|
||||||
table = pa.Table.from_pandas(prepared, preserve_index=False)
|
|
||||||
buf = io.BytesIO()
|
|
||||||
pa_csv.write_csv(
|
|
||||||
table,
|
|
||||||
buf,
|
|
||||||
write_options=pa_csv.WriteOptions(include_header=False),
|
|
||||||
)
|
|
||||||
buf.seek(0)
|
|
||||||
return buf
|
|
||||||
|
|
||||||
|
|
||||||
def copy_dataframes(
|
def copy_dataframes(
|
||||||
conn,
|
conn,
|
||||||
schema_name: str,
|
schema_name: str,
|
||||||
@ -2287,43 +1768,23 @@ def copy_dataframes(
|
|||||||
)
|
)
|
||||||
|
|
||||||
total = 0
|
total = 0
|
||||||
# Pull chunks one at a time so each ``df`` is unreferenced before the
|
|
||||||
# generator reads the next one. Without this the loop-variable binding
|
|
||||||
# of a ``for df in dfs:`` keeps the previous chunk alive during the
|
|
||||||
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
|
|
||||||
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
|
|
||||||
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
|
|
||||||
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
|
|
||||||
# once COPY has consumed it. Matters most in parallel mode where
|
|
||||||
# 32 × per-worker peak can exhaust a 128 GB host.
|
|
||||||
dfs_iter = iter(dfs)
|
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
while True:
|
for df in dfs:
|
||||||
try:
|
|
||||||
df = next(dfs_iter)
|
|
||||||
except StopIteration:
|
|
||||||
break
|
|
||||||
if df.empty:
|
if df.empty:
|
||||||
del df
|
|
||||||
continue
|
continue
|
||||||
prepared = _prepare_for_copy(df, columns)
|
prepared = _prepare_for_copy(df, columns)
|
||||||
del df
|
buf = io.StringIO()
|
||||||
n = len(prepared)
|
prepared.to_csv(
|
||||||
buf = _serialize_chunk_csv(prepared)
|
buf,
|
||||||
del prepared
|
index=False,
|
||||||
|
header=False,
|
||||||
|
na_rep="",
|
||||||
|
date_format="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
buf.seek(0)
|
||||||
cur.copy_expert(sql, buf)
|
cur.copy_expert(sql, buf)
|
||||||
del buf
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
total += n
|
total += len(prepared)
|
||||||
# Hand pyarrow's pool memory back between chunks. Without this,
|
|
||||||
# arrow's internal buffer pool keeps the high-water bytes
|
|
||||||
# reserved across the worker's lifetime - inside long-running
|
|
||||||
# workers this presents as steadily climbing RSS even with the
|
|
||||||
# ``del``s above. Cheap (microseconds); call it every chunk.
|
|
||||||
try:
|
|
||||||
pa.default_memory_pool().release_unused()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return total
|
return total
|
||||||
|
|
||||||
|
|
||||||
@ -2438,16 +1899,6 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
"PGUSER / PGPASSWORD from the environment or .env file."
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
p.add_argument(
|
|
||||||
"--all-nullable",
|
|
||||||
action="store_true",
|
|
||||||
help=(
|
|
||||||
"Stamp every column nullable in the generated schema, bypassing "
|
|
||||||
"NOT NULL inference. Use when sampled rows wrongly suggest a "
|
|
||||||
"column has no nulls. Overrides ``all_nullable`` in the YAML "
|
|
||||||
"config when set."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
@ -2470,23 +1921,13 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
|
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
|
||||||
return 2
|
return 2
|
||||||
|
|
||||||
# Schema inference reads the whole file so type + nullability are
|
# Schema inference uses a bounded preview read so we never load a
|
||||||
# computed against every row. That's what the target host has the
|
# hundreds-of-millions-of-rows file into memory just to pick types.
|
||||||
# resources for and is the only way to honestly emit ``NOT NULL`` -
|
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
|
||||||
# a bounded preview routinely missed the ~0.2% of rows with nulls on
|
# returned, not the file's total, so we don't trust it here.
|
||||||
# otherwise-dense keys (e.g. MAFID). If you're on a box that can't
|
|
||||||
# fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to
|
|
||||||
# an integer cap and know that sampled specs may stamp ``NOT NULL``
|
|
||||||
# on columns whose nulls live past the window.
|
|
||||||
preview_df, meta = read_sas_preview(cfg.filename)
|
preview_df, meta = read_sas_preview(cfg.filename)
|
||||||
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
|
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
|
||||||
force_nullable = args.all_nullable or cfg.all_nullable
|
columns = infer_schema(preview_df, meta)
|
||||||
columns = infer_schema(
|
|
||||||
preview_df,
|
|
||||||
meta,
|
|
||||||
column_types=cfg.column_types,
|
|
||||||
force_nullable=force_nullable,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Validate partition columns exist in the schema after filtering.
|
# Validate partition columns exist in the schema after filtering.
|
||||||
if cfg.partition_by:
|
if cfg.partition_by:
|
||||||
@ -2577,24 +2018,13 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
# it while we're holding a Postgres transaction open.
|
# it while we're holding a Postgres transaction open.
|
||||||
del preview_df
|
del preview_df
|
||||||
|
|
||||||
total_rows = getattr(meta, "number_rows", None)
|
|
||||||
|
|
||||||
def _filtered_chunks():
|
def _filtered_chunks():
|
||||||
pbar = tqdm(
|
seen = 0
|
||||||
total=total_rows,
|
|
||||||
unit="row",
|
|
||||||
unit_scale=True,
|
|
||||||
desc=f" {cfg.filename.name}",
|
|
||||||
file=sys.stderr,
|
|
||||||
dynamic_ncols=True,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
|
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
|
||||||
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
|
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
|
||||||
pbar.update(len(chunk_df))
|
seen += len(chunk_df)
|
||||||
|
print(f" streaming... {seen:,} rows", file=sys.stderr)
|
||||||
yield chunk_df
|
yield chunk_df
|
||||||
finally:
|
|
||||||
pbar.close()
|
|
||||||
|
|
||||||
db_user = db_password = None
|
db_user = db_password = None
|
||||||
if args.dbcreds:
|
if args.dbcreds:
|
||||||
|
|||||||
@ -38,24 +38,3 @@ if_exists: append
|
|||||||
# indexes:
|
# indexes:
|
||||||
# - state
|
# - state
|
||||||
# - zip
|
# - zip
|
||||||
|
|
||||||
# column_types: Explicit {column_name: postgres_type} overrides that
|
|
||||||
# bypass automatic type inference for the listed columns. Useful when
|
|
||||||
# pyreadstat reports a column as NUM but you want it stored as TEXT
|
|
||||||
# (phone/ID columns that are conceptually strings), or when a column's
|
|
||||||
# inferred type is off for any other reason. Columns not listed here
|
|
||||||
# fall through to the normal inference path. Nullability is always
|
|
||||||
# computed from the data.
|
|
||||||
#
|
|
||||||
# column_types:
|
|
||||||
# RESP_PH_PREFIX_ID: TEXT
|
|
||||||
# SOMELONG_ID: BIGINT
|
|
||||||
|
|
||||||
# all_nullable: If true, every column is stamped nullable in the generated
|
|
||||||
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
|
|
||||||
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
|
|
||||||
# rare-null data downstream) and COPY blows up mid-load on the first null
|
|
||||||
# it hits. Off by default. The CLI flag --all-nullable overrides this to
|
|
||||||
# true when set.
|
|
||||||
#
|
|
||||||
# all_nullable: false
|
|
||||||
|
|||||||
@ -61,52 +61,15 @@ auto_detect: true
|
|||||||
# - state
|
# - state
|
||||||
# - zip
|
# - zip
|
||||||
|
|
||||||
# Folder-level column_types: Explicit {column_name: postgres_type} map that
|
|
||||||
# bypasses automatic type inference for the listed columns. Applied to
|
|
||||||
# every cluster unless a cluster supplies its own column_types, which are
|
|
||||||
# merged on top (cluster entries win on conflict).
|
|
||||||
#
|
|
||||||
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
|
|
||||||
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
|
|
||||||
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
|
|
||||||
# BIGINT). Entries in column_types here win over that auto-union - use
|
|
||||||
# them when the auto result is wrong or when --no-prescan disables the
|
|
||||||
# auto-union and you still need to pin a column.
|
|
||||||
#
|
|
||||||
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
|
|
||||||
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
|
|
||||||
# don't exist in a given file are simply ignored for that file.
|
|
||||||
#
|
|
||||||
# column_types:
|
|
||||||
# RESP_PH_PREFIX_ID: TEXT
|
|
||||||
# RESP_PH_SUFFIX_ID: TEXT
|
|
||||||
# SOMELONG_ID: BIGINT
|
|
||||||
|
|
||||||
# Folder-level all_nullable: If true, every column of every cluster is
|
|
||||||
# stamped nullable in the generated schema; NOT NULL inference is skipped
|
|
||||||
# entirely. Use this when the sampler wrongly concludes a column has no
|
|
||||||
# nulls (sampled rows happened to be dense, but later files in the cluster
|
|
||||||
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
|
|
||||||
# unless a cluster supplies its own all_nullable. The CLI flag
|
|
||||||
# --all-nullable overrides both this and any per-cluster setting when
|
|
||||||
# passed. Off by default.
|
|
||||||
#
|
|
||||||
# all_nullable: false
|
|
||||||
|
|
||||||
# Explicit cluster patterns. Each pattern is matched against the file
|
# Explicit cluster patterns. Each pattern is matched against the file
|
||||||
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
||||||
# pool, so explicit and auto clusters compose cleanly.
|
# pool, so explicit and auto clusters compose cleanly.
|
||||||
#
|
#
|
||||||
# `tablename` is required. `if_exists`, `include`, `exclude`, and
|
# `tablename` is required. `if_exists`, `include`, and `exclude` are
|
||||||
# `column_types` are optional per-cluster overrides of the folder-level
|
# optional per-cluster overrides of the folder-level defaults above.
|
||||||
# defaults above. Cluster-level column_types entries win over folder-
|
|
||||||
# level entries for the same column.
|
|
||||||
clusters:
|
clusters:
|
||||||
- pattern: '^group_a\d+\.xpt$'
|
- pattern: '^group_a\d+\.xpt$'
|
||||||
tablename: group_a
|
tablename: group_a
|
||||||
# column_types:
|
|
||||||
# INTCOL: TEXT
|
|
||||||
# all_nullable: true # per-cluster override of the folder-level default
|
|
||||||
|
|
||||||
# Example of an explicit override. Uncomment to force the group_b cluster to
|
# Example of an explicit override. Uncomment to force the group_b cluster to
|
||||||
# append instead of replace even though the folder default is "replace":
|
# append instead of replace even though the folder default is "replace":
|
||||||
|
|||||||
@ -1,10 +1,7 @@
|
|||||||
pandas>=2.0,<3.0
|
pandas>=2.0,<3.0
|
||||||
pyreadstat>=1.2,<2.0
|
pyreadstat>=1.2,<2.0
|
||||||
numpy>=2.1,<3.0
|
numpy>=2.1,<3.0
|
||||||
pyarrow>=22.0,<24.0
|
|
||||||
pyyaml>=6.0,<7.0
|
pyyaml>=6.0,<7.0
|
||||||
psycopg2-binary>=2.9,<3.0
|
psycopg2-binary>=2.9,<3.0
|
||||||
python-dotenv>=1.0,<2.0
|
python-dotenv>=1.0,<2.0
|
||||||
boto3>=1.28,<2.0
|
boto3>=1.28,<2.0
|
||||||
openpyxl>=3.1,<4.0
|
|
||||||
tqdm>=4.66,<5.0
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user