Compare commits

..

23 Commits

Author SHA1 Message Date
David Peterson
64e7ff0b0a Enhance error reporting in load_folder.py and load_sas.py for better debugging
Updated error handling in the _worker_load_append_file function to include full tracebacks in exception messages, improving context for failures during file loading. Additionally, modified the _safe_numeric_to_datetime function to provide detailed warnings when conversion errors occur, ensuring users are informed of potential data issues. These changes aim to facilitate easier debugging and enhance the robustness of the data loading process.
2026-04-21 16:56:27 -05:00
David Peterson
eff82c73ce Add all_nullable configuration option in load_folder.py and load_sas.py for flexible schema management
Introduced an `all_nullable` boolean option in both `load_folder.py` and `load_sas.py`, allowing users to specify whether all columns should be treated as nullable during schema inference. This feature addresses scenarios where the data sampling may incorrectly suggest that columns are non-nullable, preventing potential errors during data loading. Updated YAML configuration files to include examples of this new option, enhancing usability and providing clearer documentation for users.
2026-04-21 16:48:37 -05:00
David Peterson
c283b42876 Add safe numeric to datetime conversion in load_sas.py to handle edge cases
Implemented the _safe_numeric_to_datetime function to convert numeric SAS-epoch series to datetime64[ns] while managing potential overflow and non-finite values. This enhancement improves error handling during data processing by masking invalid entries before conversion, ensuring robust handling of SAS date formats in the _prepare_for_copy function.
2026-04-21 15:55:25 -05:00
David Peterson
a46f0518f6 Suppress PerformanceWarning in load_sas.py to reduce noise during processing of wide SAS files. This change filters out warnings related to DataFrame fragmentation, which are irrelevant for our pipeline as we directly convert DataFrames to pyarrow tables. 2026-04-21 13:40:38 -05:00
David Peterson
969a442775 Refactor numeric column type inference in load_sas.py for improved data handling
Updated the logic for determining column types in the union_column_types function. Changed the default type from BIGINT to DOUBLE PRECISION for numeric columns without explicit format hints, ensuring better handling of both integer and float values. This adjustment prevents loading failures due to format discrepancies and maintains consistent data processing across various SAS formats.
2026-04-21 13:17:01 -05:00
David Peterson
212218fb67 Enhance error handling and abort functionality in load_folder.py for parallel file loading
Implemented an `--abort-on-first-failure` option in the `_load_remaining_files_parallel` function, allowing users to cancel all pending tasks immediately upon the first worker failure. This change improves user experience by providing real-time feedback on errors through stderr, ensuring that users are promptly informed of issues without waiting for all tasks to complete. Additionally, refined error reporting to maintain accurate summaries of successes and failures, even during interruptions.
2026-04-21 12:54:05 -05:00
David Peterson
ae65140390 Add column type overrides in load_folder.py and load_sas.py for enhanced schema control
Implemented a new feature allowing users to specify explicit column type mappings via a `column_types` configuration in both `load_folder.py` and `load_sas.py`. This addition enables users to bypass automatic type inference for specific columns, ensuring correct data types are used when loading datasets. Updated the YAML configuration files to include examples of the new `column_types` option, enhancing usability and flexibility in handling varying data formats across files.
2026-04-21 12:14:44 -05:00
David Peterson
0c5e6e31f0 Enhance memory management in load_folder.py and load_sas.py for improved performance
Added memory management optimizations in the _worker_load_append_file function to release unused memory from pyarrow's pool and trigger Python's garbage collection. Implemented explicit memory trimming using glibc's malloc_trim to ensure efficient memory usage during long-running processes. Updated the copy_dataframes function in load_sas.py to release pyarrow's memory pool between chunks, preventing high memory usage in long-lived workers. These changes aim to reduce memory footprint and improve overall performance during large dataset processing.
2026-04-21 10:46:54 -05:00
David Peterson
9afb52aecb Add --chunk-rows option to load_folder.py for customizable memory management
Introduced a new command-line argument, --chunk-rows, allowing users to specify the number of rows per chunk for pyreadstat streaming and COPY operations. This option overrides the GENERIC_LOADER_CHUNK_ROWS environment variable and auto-scaling behavior when using multiple workers. Enhanced memory management by providing detailed information on peak memory usage based on the specified chunk size, improving performance and usability during large dataset processing.
2026-04-21 10:05:21 -05:00
David Peterson
eac75cbb26 Refactor load_cluster function in load_folder.py for improved parallel file loading
Updated the load_cluster function to enhance parallel processing by committing the table creation before dispatching all files to worker processes. This change allows for more efficient handling of large datasets by reducing the serial workload and ensuring schema compatibility checks can access the committed table. The logic for streaming files has been clarified, maintaining progress tracking throughout the loading process.
2026-04-21 08:31:48 -05:00
David Peterson
1265489276 Enhance date and timestamp handling in _prepare_for_copy function in load_sas.py
Added support for numeric date and datetime conversions from SAS formats. Implemented logic to handle float64 representations of dates (days since 1960-01-01) and datetimes (seconds since 1960-01-01), ensuring proper parsing and preventing errors during data copying to Postgres. This enhancement improves compatibility with various SAS date formats.
2026-04-21 08:16:17 -05:00
David Peterson
2dd247b067 Add --no-prescan option to load_folder.py for skipping metadata scan
Introduced a new command-line argument, --no-prescan, allowing users to bypass the per-file metadata scan during the loading process. This enhancement is particularly useful for large folders where the pre-scan may be time-consuming. The progress bar will still display rows loaded, rate, and elapsed time, but without an estimated time of arrival (ETA) for completion. Updated the main function to handle this new option and adjusted the progress tracking accordingly.
2026-04-21 08:12:39 -05:00
David Peterson
052fb0e087 Refactor pre-scan process in load_folder.py to utilize ThreadPoolExecutor for improved performance
Updated the main function to replace sequential file processing with a threaded approach using ThreadPoolExecutor. This change enhances the efficiency of reading row counts from SAS files, particularly for large datasets, by allowing concurrent I/O operations. Added progress tracking with tqdm for better user feedback during the pre-scan phase.
2026-04-20 22:43:02 -05:00
David Peterson
fe7dc4d5a1 Enhance load_cluster function for parallel processing and progress tracking
Refactored the load_cluster function in load_folder.py to support parallel file loading using ProcessPoolExecutor, improving performance during the append phase. Added workers parameter for controlling parallelism and integrated a progress_queue for real-time progress updates. Introduced read_sas_metadata function in load_sas.py to efficiently read metadata from SAS files, optimizing the pre-scan process for global progress tracking.
2026-04-20 22:02:55 -05:00
David Peterson
96f2d6fe79 Update requirements and enhance SAS file processing with progress tracking
Updated the pyarrow version in requirements.txt to improve compatibility. Enhanced the _infer_cluster_schema and _stream_file functions in load_folder.py and load_sas.py to return total row counts for better progress tracking during data streaming. Integrated tqdm for visual feedback on row processing, improving user experience during large data loads.
2026-04-20 21:44:49 -05:00
David Peterson
7beb44ac4d Add pyarrow dependency and optimize DataFrame serialization in load_sas.py
Included pyarrow as a new dependency in requirements.txt for improved CSV serialization performance. Refactored the _prepare_for_copy function to utilize vectorized operations for date and timestamp conversions, reducing CPU overhead. Introduced a new _serialize_chunk_csv function leveraging pyarrow for faster CSV writing, enhancing efficiency during data copying to Postgres.
2026-04-20 21:32:56 -05:00
David Peterson
5e347f50ef Add widening compatibility checks in load_sas.py for type inference
Introduced a new set of widening compatible type pairs to allow for accepting narrower inferred types when they fit within wider target types during schema compatibility checks. This change enhances the type inference process by preventing unnecessary mismatches and improving handling of varying integer ranges in cluster loads. Updated warning messages to inform users of accepted type adjustments.
2026-04-20 21:08:13 -05:00
David Peterson
f84e127796 Update type inference behavior in load_sas.py to scan entire files by default
Changed the default setting for TYPE_INFERENCE_SAMPLE_ROWS to None, allowing type and nullability inference to consider all rows in a SAS file. This adjustment ensures accurate handling of null values and integer ranges, addressing issues observed in production with large datasets. Updated documentation to reflect the implications of this change and the risks associated with using an integer cap for sampling.
2026-04-20 20:43:27 -05:00
David Peterson
a94ab68f4d Refine partition name patterns in sas_profiler.py
Updated the regular expression for partition name patterns to improve matching accuracy for state-related columns. The new pattern captures variations like `state`, `state_code`, and `statecode` while avoiding false positives from unrelated terms. This change enhances the precision of partition candidate selection.
2026-04-20 19:27:01 -05:00
David Peterson
4fc85081c8 Enhance SAS profiling performance in sas_profiler.py
Added a new constant for profiling chunk size to optimize memory usage during profiling operations. Refactored the update method in the _ColumnStats class to improve efficiency in handling missing values and calculating statistics for numeric and string data types. This update includes vectorized operations for better performance and clarity in the implementation.
2026-04-20 19:03:40 -05:00
David Peterson
5449a25b44 Refactor partition candidate logic in sas_profiler.py
Updated the partition candidate selection process to restrict candidates to columns matching specific name patterns, improving accuracy and reducing noise. Removed outdated distinct value constraints and clarified documentation for partitioning behavior. Enhanced handling of pre-sharded columns and refined the classification logic for better performance.
2026-04-20 18:49:23 -05:00
David Peterson
b3b968edf2 Add openpyxl dependency to requirements.txt for Excel file handling 2026-04-20 18:38:24 -05:00
David Peterson
f1af1136dc Add standalone SAS profiling utility
Introduced a new script `sas_profiler.py` that profiles local SAS files and generates an Excel report with recommendations for drops, partitions, and indexes, along with type-inference warnings. The utility supports command-line overrides for configuration and is compatible with Python 3.10+. This addition enhances the existing tools for SAS file management.
2026-04-20 18:38:01 -05:00
6 changed files with 2756 additions and 129 deletions

File diff suppressed because it is too large Load Diff

View File

@ -183,15 +183,17 @@ Priority order used by :func:`infer_schema`:
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
``DOUBLE PRECISION``. ``DOUBLE PRECISION``.
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for Type inference scans the whole file by default (``TYPE_INFERENCE_SAMPLE_ROWS
performance on large files. The CLI enforces this at read time via = None``) so type + nullability are both computed against every row. The CLI
:func:`read_sas_preview`, so the whole file is never materialized just to pick materializes the file once for schema inference, then re-streams it chunk by
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual chunk into ``COPY``; peak memory is roughly one full dataframe. Override
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32, ``TYPE_INFERENCE_SAMPLE_ROWS`` to an integer cap if you're on a host that
or a column had no nulls in the preview but does later in the file, ``COPY`` can't hold the file in memory - but know that sampled specs carry the usual
will fail mid-stream and the whole transaction rolls back. Set risks: a later row may exceed the inferred integer range, or a column that
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing had no nulls in the preview may carry nulls later in the file (which then
matters more than speed. detonates ``COPY`` because the sampled spec stamped it ``NOT NULL``). Seen
in production on a 2.5M-row file with ~6k null MAFIDs past the 10k-row
preview - the entire load aborted mid-stream.
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
commit each chunk as it is copied so an interrupted load retains the rows commit each chunk as it is copied so an interrupted load retains the rows
@ -225,16 +227,48 @@ import math
import os import os
import re import re
import sys import sys
import warnings
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd import pandas as pd
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyreadstat import pyreadstat
import yaml import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
from pandas.errors import PerformanceWarning
from tqdm import tqdm
# ``_prepare_for_copy`` builds its output frame one column at a time with
# ``out[name] = ...``. On wide SAS files (~100+ columns) pandas prints a
# ``PerformanceWarning: DataFrame is highly fragmented`` once per chunk to
# nudge callers toward ``pd.concat(axis=1, ...)``. The fragmentation only
# matters for row-oriented ops or in-place ``.copy()``; we hand the frame
# straight to ``pyarrow.Table.from_pandas`` which reads columns
# independently, so the warning is pure noise for our pipeline. Filter it
# at import time - narrow category match so nothing else is suppressed.
warnings.filterwarnings("ignore", category=PerformanceWarning)
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
# wrapped around several internal ops (most painfully, the multiply inside
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
# Our data routinely carries ``inf`` / huge sentinels, which trip that
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
# pre-masking the obvious cases, other code paths (pandas object-dtype
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
# Setting a process-wide ``seterr`` is a heavier hammer than an
# ``errstate`` block but survives library internals that don't explicitly
# rewrap it. Downside: a real overflow bug in new code would now silently
# produce inf/nan instead of raising - acceptable for a bulk loader where
# "don't crash on bad rows, null them and move on" is the whole point.
np.seterr(over="ignore", invalid="ignore", divide="ignore")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -255,17 +289,29 @@ values; too small a sample is easy to mis-infer."""
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647) NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
"""INTEGER bounds; anything outside becomes BIGINT.""" """INTEGER bounds; anything outside becomes BIGINT."""
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000 TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = None
"""Cap on rows inspected during per-column type inference. Also governs how """Cap on rows inspected during per-column type inference. Also governs how
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate / many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
schema-inference flows. Set to ``None`` to scan every row (and read the whole schema-inference flows.
file into memory for the preview step - don't do this on multi-hundred-million
row files)."""
DEFAULT_CHUNK_ROWS = 100_000 Default is ``None`` (scan every row, reading the whole file into memory for
the schema-inference step). That's the only honest setting for nullability:
any integer cap lets a column look ``NOT NULL`` across the first N rows
while the file actually holds rare nulls past the window, which then
detonates ``COPY`` mid-stream (seen in production on a 2.5M-row file where
~6k MAFIDs were null past the 10k-row preview). If you're loading a file
so large that a full read won't fit in memory, set this to an integer cap
and accept that sampled specs can't be trusted for ``NOT NULL``."""
DEFAULT_CHUNK_ROWS = 2_000_000
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean """Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
fewer COPY round-trips but more peak memory per chunk; smaller values are fewer COPY round-trips and lower per-row overhead but more peak memory per
gentler on memory.""" chunk; smaller values are gentler on memory.
The chunk size can be overridden at runtime via the
``GENERIC_LOADER_CHUNK_ROWS`` environment variable (read inside
:func:`iter_sas_chunks`), so ``.env``-driven overrides work without code
changes. Explicit ``chunksize=`` kwargs still win over both."""
VALID_IF_EXISTS = ("fail", "replace", "append") VALID_IF_EXISTS = ("fail", "replace", "append")
@ -290,6 +336,8 @@ class LoaderConfig:
partition_by: List[str] = field(default_factory=list) partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000 max_partitions: int = 10_000
indexes: List[str] = field(default_factory=list) indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
@dataclass @dataclass
@ -500,6 +548,48 @@ def load_config(path: Path) -> LoaderConfig:
f"{missing_in_include}" f"{missing_in_include}"
) )
# -- column_types -------------------------------------------------------
# Optional ``{column_name: pg_type}`` escape hatch that bypasses
# automatic type inference for specific columns. Useful when
# pyreadstat reports a column as NUM but the downstream consumer
# expects TEXT (e.g. phone-id columns), or when a column has drifted
# between CHAR and NUM across file versions and you want to pin
# TEXT up front. See also :func:`infer_schema`.
raw_ct = raw.get("column_types")
column_types: Dict[str, str] = {}
if raw_ct is not None:
if not isinstance(raw_ct, dict):
raise ValueError(
f"Config {path}: 'column_types' must be a mapping of "
f"{{column_name: postgres_type}}."
)
for k, v in raw_ct.items():
key = str(k).strip()
if not key:
raise ValueError(
f"Config {path}: 'column_types' contains an empty "
f"column name."
)
if not isinstance(v, str) or not v.strip():
raise ValueError(
f"Config {path}: 'column_types[{key}]' must be a "
f"non-empty Postgres type string (got {v!r})."
)
column_types[key] = v.strip()
# -- all_nullable -------------------------------------------------------
# When inference wrongly stamps a column NOT NULL (sampled rows happened
# to be dense; later rows carry nulls) downstream COPYs fail mid-stream.
# Set ``all_nullable: true`` in the YAML to stamp every column nullable
# up front. The CLI flag ``--all-nullable`` overrides this to ``true``
# if set.
raw_an = raw.get("all_nullable", False)
if not isinstance(raw_an, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean (got {raw_an!r})."
)
all_nullable = bool(raw_an)
return LoaderConfig( return LoaderConfig(
filename=filename, filename=filename,
schemaname=schemaname, schemaname=schemaname,
@ -510,6 +600,8 @@ def load_config(path: Path) -> LoaderConfig:
partition_by=partition_by, partition_by=partition_by,
max_partitions=max_partitions, max_partitions=max_partitions,
indexes=indexes, indexes=indexes,
column_types=column_types,
all_nullable=all_nullable,
) )
@ -563,16 +655,46 @@ def read_sas_preview(
return reader(str(Path(path)), row_limit=row_limit, **kwargs) return reader(str(Path(path)), row_limit=row_limit, **kwargs)
def read_sas_metadata(path: Path) -> Any:
"""Read only the metadata (no rows) from a SAS file.
Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes
the file header (column names, formats, total row count, etc.) and
returns without touching the data pages. Orders of magnitude faster
than :func:`read_sas_preview` when all you need is
``meta.number_rows`` - typically a few ms per sas7bdat file, which
makes it cheap to pre-scan a whole folder to populate a global
progress bar.
"""
reader, kwargs = _sas_reader(path)
_, meta = reader(str(Path(path)), metadataonly=True, **kwargs)
return meta
def iter_sas_chunks( def iter_sas_chunks(
path: Path, path: Path,
*, *,
chunksize: int = DEFAULT_CHUNK_ROWS, chunksize: Optional[int] = None,
): ):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads. """Yield ``(df_chunk, meta)`` tuples for streaming loads.
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
underlying reader by extension and threads through our encoding defaults. underlying reader by extension and threads through our encoding defaults.
When ``chunksize`` is ``None`` (the default), the effective value comes
from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
always wins.
""" """
if chunksize is None:
raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS")
if raw is not None:
try:
chunksize = int(raw)
except ValueError:
chunksize = DEFAULT_CHUNK_ROWS
else:
chunksize = DEFAULT_CHUNK_ROWS
reader, kwargs = _sas_reader(path) reader, kwargs = _sas_reader(path)
yield from pyreadstat.read_file_in_chunks( yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs reader, str(Path(path)), chunksize=chunksize, **kwargs
@ -590,7 +712,13 @@ def apply_column_filter(
exclude: Optional[List[str]], exclude: Optional[List[str]],
) -> pd.DataFrame: ) -> pd.DataFrame:
"""Restrict ``df`` to the requested columns. Names missing from the frame """Restrict ``df`` to the requested columns. Names missing from the frame
raise a clear error rather than silently dropping.""" raise a clear error rather than silently dropping.
Returns the input frame (or a column-sliced view / drop result) without
an extra ``.copy()`` downstream (:func:`_prepare_for_copy`) reads the
frame into a freshly built output and never mutates its input, so the
copies were pure overhead on every streamed chunk.
"""
if include is not None and exclude is not None: if include is not None and exclude is not None:
raise ValueError("include and exclude are mutually exclusive.") raise ValueError("include and exclude are mutually exclusive.")
@ -598,15 +726,15 @@ def apply_column_filter(
missing = [c for c in include if c not in df.columns] missing = [c for c in include if c not in df.columns]
if missing: if missing:
raise ValueError(f"include references unknown columns: {missing}") raise ValueError(f"include references unknown columns: {missing}")
return df.loc[:, list(include)].copy() return df.loc[:, list(include)]
if exclude is not None: if exclude is not None:
missing = [c for c in exclude if c not in df.columns] missing = [c for c in exclude if c not in df.columns]
if missing: if missing:
raise ValueError(f"exclude references unknown columns: {missing}") raise ValueError(f"exclude references unknown columns: {missing}")
return df.drop(columns=list(exclude)).copy() return df.drop(columns=list(exclude))
return df.copy() return df
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -634,6 +762,126 @@ def _format_driven_type(sas_format: Optional[str]) -> Optional[str]:
return None return None
_DECIMAL_FORMAT_RE = re.compile(r"\.(\d+)")
def _format_hints_decimal(sas_format: Optional[str]) -> bool:
"""True if a numeric SAS format string explicitly carries decimal places.
SAS numeric formats are ``NAMEw.d``; ``d > 0`` means the variable was
intended to render with ``d`` decimal digits (COMMA10.2, F8.3, ...).
A bare width like ``BEST12.`` or ``F8.`` has no digits after the dot
and is treated as integer-presenting. Used by
:func:`union_column_types` to pick BIGINT vs DOUBLE PRECISION when a
column is numeric in every file of a cluster.
"""
if not sas_format:
return False
m = _DECIMAL_FORMAT_RE.search(sas_format)
if not m:
return False
try:
return int(m.group(1)) > 0
except ValueError:
return False
def extract_union_metadata(
meta: Any,
) -> Dict[str, Tuple[str, Optional[str]]]:
"""Pull the (readstat_type, sas_format) pair for every column in ``meta``.
Returns a plain dict that's safe to pass between processes and to
:func:`union_column_types`. ``readstat_type`` is the simplified type
reported by pyreadstat: ``"string"`` for SAS CHAR, ``"double"`` for
SAS NUM. ``sas_format`` comes from ``meta.original_variable_types``
and drives date/datetime detection during union.
"""
var_types = dict(getattr(meta, "variable_types", None) or {})
formats = dict(getattr(meta, "original_variable_types", None) or {})
names = list(
getattr(meta, "column_names", None)
or list(var_types.keys())
or list(formats.keys())
)
out: Dict[str, Tuple[str, Optional[str]]] = {}
for col in names:
rtype = str(var_types.get(col, "")) if var_types else ""
fmt = formats.get(col)
out[col] = (rtype, fmt if fmt else None)
return out
def union_column_types(
per_file_metas: Iterable[Dict[str, Tuple[str, Optional[str]]]],
) -> Dict[str, str]:
"""Derive one Postgres type per column that's safe across every file.
``per_file_metas`` is an iterable (one entry per file in a cluster) of
``{column_name: (readstat_type, sas_format)}`` dicts as produced by
:func:`extract_union_metadata`.
Rules, evaluated per column:
* **CHAR/NUM drift wins TEXT.** If any file stores the column as CHAR
(``readstat_type != "double"``) the union is ``TEXT``. This covers
the phone-id case where some years stored ``RESP_PH_PREFIX_ID`` as
CHAR and others as NUM.
* **All NUM, format hints DATETIME TIMESTAMP.** Any file whose
format resolves to ``TIMESTAMP`` (via :func:`_format_driven_type`)
pins the column to ``TIMESTAMP`` even if other files left the
format blank.
* **All NUM, format hints DATE DATE.** Same idea for date-only
formats.
* **All NUM, any decimal hint DOUBLE PRECISION.** A ``w.d`` format
with ``d > 0`` in any file implies fractional values somewhere.
* **All NUM, no useful hint DOUBLE PRECISION.** SAS numeric
formats are *display* formats, not storage constraints - a
``BEST12.`` / ``F8.`` / blank-format column can still hold floats,
and pyreadstat hands back plain ``float64`` regardless. Defaulting
to ``DOUBLE PRECISION`` here costs the same 8 bytes as ``BIGINT``
but can't fail on real data. For columns that truly are
integer-only and you want ``BIGINT`` semantics in queries, pin
them via a ``column_types`` override.
Columns missing from a given file are simply skipped for that file;
the union is computed over whichever files *did* supply the column.
Columns that never appear anywhere are omitted from the result.
"""
per_col: Dict[str, List[Tuple[str, Optional[str]]]] = {}
for meta in per_file_metas:
for col, pair in meta.items():
per_col.setdefault(col, []).append(pair)
result: Dict[str, str] = {}
for col, entries in per_col.items():
any_char = any(
rtype and rtype.lower() != "double" for rtype, _ in entries
)
if any_char:
result[col] = "TEXT"
continue
formats = [fmt for _, fmt in entries if fmt]
driven = [_format_driven_type(f) for f in formats]
if "TIMESTAMP" in driven:
result[col] = "TIMESTAMP"
elif "DATE" in driven:
result[col] = "DATE"
else:
# Safe default: DOUBLE PRECISION. The BIGINT default we tried
# first failed the moment a file contained a fractional
# value in a column whose format didn't carry a decimal
# hint (very common: SAS ``BEST12.`` / ``F8.`` are display
# formats, not storage constraints, so the underlying
# 8-byte float can hold any value). Same storage cost as
# BIGINT, handles both integer- and float-valued data, and
# keeps loads from failing mid-cluster. Use a
# ``column_types`` override to pin specific columns to
# ``BIGINT`` when you want integer semantics in queries.
result[col] = "DOUBLE PRECISION"
return result
def _all_null(series: pd.Series) -> bool: def _all_null(series: pd.Series) -> bool:
if pd.api.types.is_object_dtype(series): if pd.api.types.is_object_dtype(series):
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all()) return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
@ -759,6 +1007,8 @@ def infer_schema(
*, *,
coerce_chars: bool = COERCE_CHAR_COLUMNS, coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None, total_rows: Optional[int] = None,
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
) -> Dict[str, ColumnSpec]: ) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``. """Infer a Postgres column spec for each column in ``df``.
@ -774,11 +1024,30 @@ def infer_schema(
``total_rows`` lets callers who already sampled the frame (e.g. via ``total_rows`` lets callers who already sampled the frame (e.g. via
:func:`read_sas_preview`) report the real file size in the per-column :func:`read_sas_preview`) report the real file size in the per-column
"inferred from first N of M rows" note. Falls back to ``len(df)``. "inferred from first N of M rows" note. Falls back to ``len(df)``.
``column_types`` is an optional map ``{column_name: pg_type_str}``
whose entries bypass inference entirely - the caller has already
decided the type (e.g. via :func:`union_column_types` across a
cluster, or a YAML ``column_types`` override). Nullability is still
computed from the data. Columns in ``column_types`` that don't exist
in ``df`` are ignored so a shared override dict can apply to clusters
with different column sets.
``force_nullable=True`` stamps every column nullable regardless of
what the data sample shows. Escape hatch for when inference marks a
column ``NOT NULL`` because the sampled rows happened to be dense but
downstream files carry nulls in that column - common with cluster
loads where one file's preview can't speak for the rest. Cheaper than
trying to sharpen the sampler: widen the column and move on.
""" """
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
# Row-walking type probes run on a bounded head slice; nullability and the # When ``TYPE_INFERENCE_SAMPLE_ROWS`` is an integer cap, row-walking type
# all-null check still see every row so NOT NULL declarations stay honest. # probes run on the head slice for speed; nullability and the all-null
# check still walk every row of ``df``. That's only honest when the
# caller handed us the full file - with the default cap of ``None`` the
# CLI does exactly that. Callers who pass a partial preview and a tight
# integer cap accept that ``NOT NULL`` can be wrong for rare-null columns.
df_rows = len(df) df_rows = len(df)
effective_total = total_rows if total_rows is not None else df_rows effective_total = total_rows if total_rows is not None else df_rows
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS: if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
@ -789,6 +1058,8 @@ def infer_schema(
sample_size = df_rows sample_size = df_rows
sampled = sample_size < effective_total sampled = sample_size < effective_total
overrides: Dict[str, str] = dict(column_types or {})
# Temporarily flip the module-level flag if the caller asked us to. # Temporarily flip the module-level flag if the caller asked us to.
global COERCE_CHAR_COLUMNS global COERCE_CHAR_COLUMNS
saved = COERCE_CHAR_COLUMNS saved = COERCE_CHAR_COLUMNS
@ -801,6 +1072,27 @@ def infer_schema(
sas_format = original_formats.get(col) sas_format = original_formats.get(col)
notes: List[str] = [] notes: List[str] = []
if col in overrides:
pg_type = overrides[col]
notes.append(
f"type forced to {pg_type} via column_types override"
)
if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
out[col] = ColumnSpec(
name=col,
postgres_type=pg_type,
nullable=nullable,
sas_format=sas_format,
source_dtype=str(series.dtype),
notes=notes,
sampled=sampled,
)
continue
pg_type = _format_driven_type(sas_format) pg_type = _format_driven_type(sas_format)
if pg_type is None: if pg_type is None:
@ -831,7 +1123,11 @@ def infer_schema(
f"{effective_total:,} rows" f"{effective_total:,} rows"
) )
nullable = _is_nullable(series) if force_nullable:
nullable = True
notes.append("nullable forced via --all-nullable")
else:
nullable = _is_nullable(series)
out[col] = ColumnSpec( out[col] = ColumnSpec(
name=col, name=col,
@ -964,6 +1260,30 @@ def _normalize_type(pg_type: str) -> str:
return _TYPE_NORMALIZATION.get(stripped, stripped.lower()) return _TYPE_NORMALIZATION.get(stripped, stripped.lower())
# Widening pairs: (inferred_from_source, existing_in_target). When the
# incoming spec is narrower than the target we accept it - the value is
# guaranteed to fit, and ``_prepare_for_copy`` already emits ``COPY``
# payloads that Postgres silently promotes to the wider column type. The
# INVERSE direction stays a hard failure: a BIGINT value does not fit in
# an INTEGER column, so we must not let a cluster whose first file had
# only small ints accept a later file with a value past int32. Comes up
# most often on cluster loads where file 1 pushed the target to BIGINT
# (a single value > 2_147_483_647) and file N happens to sit entirely
# within int32 range - strict equality would reject file N even though
# the copy is trivially safe.
_WIDENING_COMPATIBLE: set = {
("smallint", "integer"),
("smallint", "bigint"),
("integer", "bigint"),
("real", "double precision"),
# INTEGER / BIGINT into DOUBLE PRECISION is lossless for int32 and
# exact up to 2**53 for int64, which covers every value pandas could
# have carried through as Int64 without wrapping anyway.
("integer", "double precision"),
("bigint", "double precision"),
}
def _assert_schema_compatible( def _assert_schema_compatible(
conn, schema: str, table: str, columns: Dict[str, ColumnSpec] conn, schema: str, table: str, columns: Dict[str, ColumnSpec]
) -> None: ) -> None:
@ -990,11 +1310,22 @@ def _assert_schema_compatible(
inferred_norm = _normalize_type(spec.postgres_type) inferred_norm = _normalize_type(spec.postgres_type)
target_norm = _normalize_type(target_type) target_norm = _normalize_type(target_type)
if inferred_norm != target_norm: if inferred_norm != target_norm:
mismatches.append( if (inferred_norm, target_norm) in _WIDENING_COMPATIBLE:
f"column {name!r}: inferred {spec.postgres_type} " # Narrower inferred type fits inside the wider target.
f"(normalized {inferred_norm!r}) but target is {target_type} " # Accept silently-but-noisily so the operator knows the
f"(normalized {target_norm!r})" # file came in with a smaller range than the cluster's
) # target was sized for.
warnings.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(narrower than target {target_type}); accepting - "
f"values fit in the wider target type"
)
else:
mismatches.append(
f"column {name!r}: inferred {spec.postgres_type} "
f"(normalized {inferred_norm!r}) but target is {target_type} "
f"(normalized {target_norm!r})"
)
target_is_notnull = (target_nullable == "NO") target_is_notnull = (target_nullable == "NO")
if spec.nullable and target_is_notnull: if spec.nullable and target_is_notnull:
warnings.append( warnings.append(
@ -1661,9 +1992,151 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
return dt.time(h, m, s) return dt.time(h, m, s)
# Safe outer bound for the numeric->datetime conversion below. The true
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
# and (b) staying well inside the float64 + datetime64[ns] windows gives
# pandas' internals zero room to trip the ``over="raise"`` they wrap
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
_SAS_DATETIME_SAFE_S = 7_500_000_000
_SAS_DATETIME_SAFE_D = 87_000
def _safe_numeric_to_datetime(
series: pd.Series,
*,
unit: str,
column_name: str,
target_type: str,
) -> pd.Series:
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
one stray cell take down the worker.
Failure modes seen in production:
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
sentinels, divide-by-zero in the source, uninitialized cells).
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
overflows float64.
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
seconds) where the nanosecond multiply silently produces garbage or
overflows int64.
All of these trigger ``FloatingPointError: overflow encountered in multiply``
inside ``pd.to_datetime`` because pandas wraps the multiply in
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
gets a chance to turn the bad value into ``NaT``.
Strategy, belt + suspenders + airbag:
1. Coerce to float64 up front. Object-dtype branches hand us mixed
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
and NaNs the rest, so we hit the rest of this function with a
pristine float series.
2. Mask non-finite values and anything outside the safe epoch window to
NaN *before* ``pd.to_datetime`` sees them.
3. Run the conversion under a permissive ``errstate``.
4. If that still raises (some pandas version internally re-enables
``over="raise"`` in a way ``errstate`` can't override), catch it
and return all-NaT for the column with a loud warning. Better a
NULL column in one chunk than a dead worker + no diagnostics.
Emits one stderr line per chunk per affected column so silent data
loss doesn't sneak by.
"""
if not pd.api.types.is_float_dtype(series):
series = pd.to_numeric(series, errors="coerce").astype("float64")
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
if unit == "s":
bound = _SAS_DATETIME_SAFE_S
elif unit == "D":
bound = _SAS_DATETIME_SAFE_D
else:
bound = _SAS_DATETIME_SAFE_S
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
finite_mask = np.isfinite(arr)
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
# to ``bound``, so ``in_range_mask`` already excludes non-finite
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
# in case a future numpy changes that semantic.
in_range_mask = np.abs(arr) < bound
keep_mask = finite_mask & in_range_mask
was_present = ~np.isnan(arr)
coerced = int(((~keep_mask) & was_present).sum())
if coerced:
tqdm.write(
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
f"coerced to NULL",
file=sys.stderr,
)
cleaned_arr = np.where(keep_mask, arr, np.nan)
cleaned = pd.Series(cleaned_arr, index=series.index)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
)
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk. This usually means one "
f"or more values slipped past the pre-mask (bound={bound}). "
f"Consider setting the column to TEXT via column_types if this "
f"recurs.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _safe_object_to_datetime(
series: pd.Series,
*,
column_name: str,
target_type: str,
) -> pd.Series:
"""Object-dtype to datetime. Shares the safety net (errstate +
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
actually numeric-flavored (e.g. SAS wrote numbers into an object
column), route to the numeric path; otherwise parse with ``to_datetime``
on the object itself.
"""
coerced = series.replace({"": None})
numeric = pd.to_numeric(coerced, errors="coerce")
all_numeric = numeric.notna().sum() == coerced.notna().sum()
if all_numeric and coerced.notna().any():
return _safe_numeric_to_datetime(
numeric, unit="s", column_name=column_name, target_type=target_type,
)
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(coerced, errors="coerce")
except (FloatingPointError, OverflowError, ValueError) as exc:
tqdm.write(
f"[error] {target_type} column {column_name!r}: "
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
f"returning NaT for the entire chunk.",
file=sys.stderr,
)
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame: def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
"""Materialize a copy of ``df`` with each column in the right shape for """Materialize a copy of ``df`` with each column in the right shape for
``to_csv`` so the CSV lands as valid input for the target Postgres type. ``to_csv`` so the CSV lands as valid input for the target Postgres type.
Per-column conversions are vectorized (``.astype`` / ``pd.to_datetime`` /
``.mask`` / ``.fillna``) instead of the element-wise ``.map(func)``
loops this function used to run. That was the single largest per-chunk
CPU cost on text-heavy loads - a 40-column × 100k-row chunk was issuing
~4M Python-level function calls just to cast strings. TIME columns are
still the ``.map`` path because SAS TIME8 is stored as seconds and the
clamp-to-24h logic doesn't fit cleanly in vector form; they're also
rare in practice.
""" """
out = pd.DataFrame(index=df.index) out = pd.DataFrame(index=df.index)
for name, spec in columns.items(): for name, spec in columns.items():
@ -1686,59 +2159,86 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
if pd.api.types.is_datetime64_any_dtype(series): if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series.dt.date out[name] = series.dt.date
elif pd.api.types.is_object_dtype(series): elif pd.api.types.is_object_dtype(series):
def _to_date(v: Any) -> Optional[dt.date]: # Vectorized parse: empty strings / None / unparseable -> NaT,
if v is None or (isinstance(v, float) and pd.isna(v)): # then .dt.date yields date objects or NaT. NaT serializes as
return None # an empty CSV field (matching ``NULL ''`` in COPY). Routed
if isinstance(v, dt.datetime): # through ``_safe_object_to_datetime`` so an object column
return v.date() # that actually contains SAS-epoch numerics (seen when one
if isinstance(v, dt.date): # file of a cluster stores the column as NUM and another as
return v # CHAR + the union flipped it to TEXT-then-DATE) can't trip
if isinstance(v, str): # the overflow-in-multiply bug.
if v == "": parsed = _safe_object_to_datetime(
return None series, column_name=name, target_type="DATE",
try: )
return dt.date.fromisoformat(v) out[name] = parsed.dt.date
except ValueError: elif pd.api.types.is_numeric_dtype(series):
return None # pyreadstat couldn't decode the SAS format (some
return None # ``DATEw.``/``YYMMDDw.`` variants and all custom formats slip
out[name] = series.map(_to_date) # through) so the column came back as float64: days since
# 1960-01-01, the SAS epoch. Without this branch the raw
# number would hit COPY and Postgres rejects it with
# ``invalid input syntax for type date``.
parsed = _safe_numeric_to_datetime(
series, unit="D", column_name=name, target_type="DATE",
)
out[name] = parsed.dt.date
else: else:
out[name] = series out[name] = series
elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"): elif pg in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP WITH TIME ZONE"):
if pd.api.types.is_datetime64_any_dtype(series): if pd.api.types.is_datetime64_any_dtype(series):
out[name] = series out[name] = series
elif pd.api.types.is_object_dtype(series): elif pd.api.types.is_object_dtype(series):
def _to_dt(v: Any) -> Optional[dt.datetime]: # Same rationale as the DATE object branch above: route
if v is None or (isinstance(v, float) and pd.isna(v)): # through the safety net so numeric-flavored object columns
return None # can't blow us up during the ns multiply.
if isinstance(v, dt.datetime): out[name] = _safe_object_to_datetime(
return v series, column_name=name, target_type="TIMESTAMP",
if isinstance(v, dt.date): )
return dt.datetime(v.year, v.month, v.day) elif pd.api.types.is_numeric_dtype(series):
if isinstance(v, pd.Timestamp): # Same story as the DATE branch above, but SAS datetimes are
return v.to_pydatetime() if not pd.isna(v) else None # *seconds* since 1960-01-01 (fractional seconds for
if isinstance(v, str): # ``DATETIMEw.d``). Example caught in the wild:
if v == "": # ``1915465463.615`` -> 2020-09-13 05:44:23.615.
return None out[name] = _safe_numeric_to_datetime(
try: series, unit="s", column_name=name, target_type="TIMESTAMP",
return dt.datetime.fromisoformat(v) )
except ValueError:
return None
return None
out[name] = series.map(_to_dt)
else: else:
out[name] = series out[name] = series
elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"): elif pg in ("TIME", "TIME WITHOUT TIME ZONE", "TIME WITH TIME ZONE"):
out[name] = series.map(_seconds_to_time) out[name] = series.map(_seconds_to_time)
elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"): elif pg in ("TEXT", "VARCHAR", "CHARACTER VARYING", "CHAR", "CHARACTER"):
# Leave empty strings as "" so `NULL ''` in COPY turns them into NULL. # Render every cell as a string and blank out nulls. ``NULL ''``
def _to_str(v: Any) -> Any: # in the COPY statement turns the blanks back into SQL NULL.
if v is None: # astype(str) stringifies NaN/None to the literal "nan"/"None",
return "" # so we mask those after the fact rather than branching per cell.
if isinstance(v, float) and pd.isna(v): na_mask = series.isna()
return "" if pd.api.types.is_numeric_dtype(series):
return str(v) # Hit when a column was auto-unioned to TEXT because at
out[name] = series.map(_to_str) # least one file of the cluster stored it as CHAR but this
# particular file stored it as NUM (typical of SAS phone-id
# columns). Default float formatting would emit "123.0" -
# which doesn't match the plain "123" coming from the CHAR
# files. When the whole chunk is integer-valued, round to
# int before stringifying; when any fractional value is
# present we leave float formatting alone so we don't
# silently drop precision.
nonnull = series.dropna()
int_like = False
if not nonnull.empty:
try:
int_like = bool(((nonnull % 1) == 0).all())
except TypeError:
int_like = False
if int_like:
# ``Int64`` preserves NA; ``.astype(str)`` renders NA
# as '<NA>', which we then mask out alongside original
# NaNs.
as_str = series.astype("Int64").astype(str)
out[name] = as_str.mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
else:
out[name] = series.astype(str).mask(na_mask, "")
elif pg == "BOOLEAN": elif pg == "BOOLEAN":
out[name] = series.astype("boolean") if series.dtype != object else series out[name] = series.astype("boolean") if series.dtype != object else series
else: else:
@ -1746,6 +2246,25 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
return out return out
def _serialize_chunk_csv(prepared: pd.DataFrame) -> io.BytesIO:
"""Serialize a prepared frame into a CSV buffer for ``COPY FROM STDIN``.
Uses ``pyarrow.csv.write_csv`` (typically 5-10× faster than pandas'
pure-Python ``to_csv`` on wide/text-heavy frames). Null cells serialize
as empty strings and date/timestamp values land in ISO 8601 form, both
of which Postgres accepts under ``FORMAT csv, NULL ''``.
"""
table = pa.Table.from_pandas(prepared, preserve_index=False)
buf = io.BytesIO()
pa_csv.write_csv(
table,
buf,
write_options=pa_csv.WriteOptions(include_header=False),
)
buf.seek(0)
return buf
def copy_dataframes( def copy_dataframes(
conn, conn,
schema_name: str, schema_name: str,
@ -1768,23 +2287,43 @@ def copy_dataframes(
) )
total = 0 total = 0
# Pull chunks one at a time so each ``df`` is unreferenced before the
# generator reads the next one. Without this the loop-variable binding
# of a ``for df in dfs:`` keeps the previous chunk alive during the
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
# once COPY has consumed it. Matters most in parallel mode where
# 32 × per-worker peak can exhaust a 128 GB host.
dfs_iter = iter(dfs)
with conn.cursor() as cur: with conn.cursor() as cur:
for df in dfs: while True:
try:
df = next(dfs_iter)
except StopIteration:
break
if df.empty: if df.empty:
del df
continue continue
prepared = _prepare_for_copy(df, columns) prepared = _prepare_for_copy(df, columns)
buf = io.StringIO() del df
prepared.to_csv( n = len(prepared)
buf, buf = _serialize_chunk_csv(prepared)
index=False, del prepared
header=False,
na_rep="",
date_format="%Y-%m-%d %H:%M:%S",
)
buf.seek(0)
cur.copy_expert(sql, buf) cur.copy_expert(sql, buf)
del buf
conn.commit() conn.commit()
total += len(prepared) total += n
# Hand pyarrow's pool memory back between chunks. Without this,
# arrow's internal buffer pool keeps the high-water bytes
# reserved across the worker's lifetime - inside long-running
# workers this presents as steadily climbing RSS even with the
# ``del``s above. Cheap (microseconds); call it every chunk.
try:
pa.default_memory_pool().release_unused()
except Exception:
pass
return total return total
@ -1899,6 +2438,16 @@ def _build_argparser() -> argparse.ArgumentParser:
"PGUSER / PGPASSWORD from the environment or .env file." "PGUSER / PGPASSWORD from the environment or .env file."
), ),
) )
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference. Use when sampled rows wrongly suggest a "
"column has no nulls. Overrides ``all_nullable`` in the YAML "
"config when set."
),
)
return p return p
@ -1921,13 +2470,23 @@ def main(argv: Optional[List[str]] = None) -> int:
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr) print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
return 2 return 2
# Schema inference uses a bounded preview read so we never load a # Schema inference reads the whole file so type + nullability are
# hundreds-of-millions-of-rows file into memory just to pick types. # computed against every row. That's what the target host has the
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows # resources for and is the only way to honestly emit ``NOT NULL`` -
# returned, not the file's total, so we don't trust it here. # a bounded preview routinely missed the ~0.2% of rows with nulls on
# otherwise-dense keys (e.g. MAFID). If you're on a box that can't
# fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to
# an integer cap and know that sampled specs may stamp ``NOT NULL``
# on columns whose nulls live past the window.
preview_df, meta = read_sas_preview(cfg.filename) preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
columns = infer_schema(preview_df, meta) force_nullable = args.all_nullable or cfg.all_nullable
columns = infer_schema(
preview_df,
meta,
column_types=cfg.column_types,
force_nullable=force_nullable,
)
# Validate partition columns exist in the schema after filtering. # Validate partition columns exist in the schema after filtering.
if cfg.partition_by: if cfg.partition_by:
@ -2018,13 +2577,24 @@ def main(argv: Optional[List[str]] = None) -> int:
# it while we're holding a Postgres transaction open. # it while we're holding a Postgres transaction open.
del preview_df del preview_df
total_rows = getattr(meta, "number_rows", None)
def _filtered_chunks(): def _filtered_chunks():
seen = 0 pbar = tqdm(
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename): total=total_rows,
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude) unit="row",
seen += len(chunk_df) unit_scale=True,
print(f" streaming... {seen:,} rows", file=sys.stderr) desc=f" {cfg.filename.name}",
yield chunk_df file=sys.stderr,
dynamic_ncols=True,
)
try:
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
pbar.update(len(chunk_df))
yield chunk_df
finally:
pbar.close()
db_user = db_password = None db_user = db_password = None
if args.dbcreds: if args.dbcreds:

View File

@ -38,3 +38,24 @@ if_exists: append
# indexes: # indexes:
# - state # - state
# - zip # - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -61,15 +61,52 @@ auto_detect: true
# - state # - state
# - zip # - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file # Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect # *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly. # pool, so explicit and auto clusters compose cleanly.
# #
# `tablename` is required. `if_exists`, `include`, and `exclude` are # `tablename` is required. `if_exists`, `include`, `exclude`, and
# optional per-cluster overrides of the folder-level defaults above. # `column_types` are optional per-cluster overrides of the folder-level
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
clusters: clusters:
- pattern: '^group_a\d+\.xpt$' - pattern: '^group_a\d+\.xpt$'
tablename: group_a tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to # Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace": # append instead of replace even though the folder default is "replace":

View File

@ -1,7 +1,10 @@
pandas>=2.0,<3.0 pandas>=2.0,<3.0
pyreadstat>=1.2,<2.0 pyreadstat>=1.2,<2.0
numpy>=2.1,<3.0 numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0 pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0 psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0 python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0 boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

1138
utils/sas_profiler.py Normal file

File diff suppressed because it is too large Load Diff