batch_folder_processing #1

Merged
dp merged 7 commits from batch_folder_processing into main 2026-04-18 18:23:52 +00:00
2 changed files with 192 additions and 77 deletions
Showing only changes of commit 5645ff5597 - Show all commits

View File

@ -1,3 +1,4 @@
/.venv
/samples
/.env
/__pycache__

View File

@ -107,26 +107,29 @@ loosening is not checked here because the append-mode check already covers it.
5. Library usage
----------------
The CLI is a thin wrapper around composable functions. A typical orchestrator
looks like::
The CLI is a thin wrapper around composable functions. The preferred pattern
infers the schema from a bounded preview and then streams the rest of the
file chunk-by-chunk into ``COPY`` - crucial for SAS files with hundreds of
millions of rows::
from dotenv import load_dotenv
from load_sas import (
load_config, read_sas, apply_column_filter, infer_schema,
validate_against_manifest, render_create_table,
connect, create_table, copy_dataframe,
load_config, read_sas_preview, iter_sas_chunks, apply_column_filter,
infer_schema, validate_against_manifest, render_create_table,
connect, create_table, copy_dataframes,
)
load_dotenv()
cfg = load_config("config.yaml")
df, meta = read_sas(cfg.filename)
df = apply_column_filter(df, cfg.include, cfg.exclude)
columns = infer_schema(df, meta)
# Optional: preview
# Schema from a preview slice (bounded by TYPE_INFERENCE_SAMPLE_ROWS).
preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
total_rows = getattr(meta, "number_rows", None)
columns = infer_schema(preview_df, meta, total_rows=total_rows)
# Optional: preview DDL / validate against a manifest.
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
# Optional: validate against a manifest
problems = validate_against_manifest(columns, Path("expected.json"))
assert not problems, problems
@ -134,15 +137,23 @@ looks like::
conn.autocommit = False
try:
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
rows = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns)
chunks = (
apply_column_filter(df, cfg.include, cfg.exclude)
for df, _ in iter_sas_chunks(cfg.filename)
)
rows = copy_dataframes(conn, cfg.schemaname, cfg.tablename, chunks, columns)
conn.commit()
finally:
conn.close()
For small files (or tests) the legacy one-shot API still works:
:func:`read_sas` returns the whole frame and :func:`copy_dataframe` copies it
in one round trip.
All functions are side-effect free except :func:`connect`, :func:`create_table`,
and :func:`copy_dataframe`; schema inference (:func:`infer_schema`) accepts a
``coerce_chars`` kwarg to override the module-level ``COERCE_CHAR_COLUMNS``
without mutating global state.
:func:`copy_dataframe`, and :func:`copy_dataframes`; schema inference
(:func:`infer_schema`) accepts a ``coerce_chars`` kwarg to override the
module-level ``COERCE_CHAR_COLUMNS`` without mutating global state.
6. Type inference summary
-------------------------
@ -164,25 +175,32 @@ Priority order used by :func:`infer_schema`:
``DOUBLE PRECISION``.
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
performance on large files. Nullability and all-null detection still run over
the full column (they're vectorized and fast) so a ``NOT NULL`` constraint is
never declared for a column that has a null anywhere in the file. Tradeoff:
if the first N rows fit ``INTEGER`` but a later row exceeds int32, COPY will
fail; bump the sample size or set ``TYPE_INFERENCE_SAMPLE_ROWS = None`` to
scan the whole column.
performance on large files. The CLI enforces this at read time via
:func:`read_sas_preview`, so the whole file is never materialized just to pick
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
or a column had no nulls in the preview but does later in the file, ``COPY``
will fail mid-stream and the whole transaction rolls back. Set
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
matters more than speed.
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
share one cursor and transaction so a failure mid-file rolls back the whole
load.
7. Tunables
-----------
Module-level knobs at the top of this file:
* ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/
dates (default True).
* ``COERCE_CHAR_COLUMNS`` - promote stringly-typed numerics / dates
(default True).
* ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before
char-column coercion is attempted.
* ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become
``BIGINT``.
* ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows used for type inference
* ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows read for type inference
(``None`` = scan the whole column).
* ``DEFAULT_CHUNK_ROWS`` - rows per streaming COPY chunk.
"""
from __future__ import annotations
@ -195,7 +213,7 @@ import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple
import pandas as pd
import psycopg2
@ -221,13 +239,16 @@ NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
"""INTEGER bounds; anything outside becomes BIGINT."""
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
"""Cap on rows inspected during per-column type inference. The row-walking
helpers (date detection on object columns, string-coercion probes, whole-number
check on numeric columns) operate on ``df.head(TYPE_INFERENCE_SAMPLE_ROWS)``
instead of the full frame, which matters on SAS files with hundreds of millions
of rows. Nullability is still evaluated across the whole column (cheap,
vectorized) so ``NOT NULL`` declarations remain safe. Set to ``None`` to scan
every row."""
"""Cap on rows inspected during per-column type inference. Also governs how
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
schema-inference flows. Set to ``None`` to scan every row (and read the whole
file into memory for the preview step - don't do this on multi-hundred-million
row files)."""
DEFAULT_CHUNK_ROWS = 100_000
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
fewer COPY round-trips but more peak memory per chunk; smaller values are
gentler on memory."""
VALID_IF_EXISTS = ("fail", "replace", "append")
@ -256,6 +277,12 @@ class ColumnSpec:
sas_format: Optional[str] = None
source_dtype: Optional[str] = None
notes: List[str] = field(default_factory=list)
sampled: bool = False
"""True when the type was inferred from a bounded preview rather than the
full file. A sampled spec carries the usual sampling risks: a later chunk
could contain a value that exceeds the inferred integer range, doesn't
parse as the inferred type, or is null in a column the preview showed as
non-null - all of which surface as mid-``COPY`` failures."""
# ---------------------------------------------------------------------------
@ -355,8 +382,8 @@ def load_config(path: Path) -> LoaderConfig:
# ---------------------------------------------------------------------------
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
"""Dispatch to the right pyreadstat reader by extension.
def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]:
"""Return ``(pyreadstat_reader, extra_kwargs)`` for ``path``.
Invariants (learned the hard way while building the sample generator):
@ -364,15 +391,58 @@ def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
encoding on XPORT files it wrote itself.
* ``.sas7bdat`` - explicit ``encoding="latin-1"`` per colleague guidance.
"""
path = Path(path)
suffix = path.suffix.lower()
suffix = Path(path).suffix.lower()
if suffix in (".xpt", ".xport"):
return pyreadstat.read_xport(str(path))
return pyreadstat.read_xport, {}
if suffix == ".sas7bdat":
return pyreadstat.read_sas7bdat(str(path), encoding="latin-1")
return pyreadstat.read_sas7bdat, {"encoding": "latin-1"}
raise ValueError(f"Unsupported SAS file extension: {suffix}")
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
"""Read an entire SAS file into memory. Only safe for small files.
Kept for backward compatibility and tests; the CLI now uses
:func:`read_sas_preview` + :func:`iter_sas_chunks` so it never materializes
the whole frame at once.
"""
reader, kwargs = _sas_reader(path)
return reader(str(Path(path)), **kwargs)
def read_sas_preview(
path: Path,
*,
rows: Optional[int] = None,
) -> Tuple[pd.DataFrame, Any]:
"""Read the first ``rows`` records from ``path`` plus its metadata.
Defaults to ``TYPE_INFERENCE_SAMPLE_ROWS`` when ``rows`` is not given.
Passing ``rows=None`` with ``TYPE_INFERENCE_SAMPLE_ROWS=None`` reads the
whole file (pyreadstat treats ``row_limit=0`` as unlimited).
"""
reader, kwargs = _sas_reader(path)
effective = rows if rows is not None else TYPE_INFERENCE_SAMPLE_ROWS
row_limit = int(effective) if effective else 0
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
def iter_sas_chunks(
path: Path,
*,
chunksize: int = DEFAULT_CHUNK_ROWS,
):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
underlying reader by extension and threads through our encoding defaults.
"""
reader, kwargs = _sas_reader(path)
yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs
)
# ---------------------------------------------------------------------------
# Column filtering
# ---------------------------------------------------------------------------
@ -552,6 +622,7 @@ def infer_schema(
meta: Any,
*,
coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None,
) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``.
@ -563,18 +634,24 @@ def infer_schema(
``COERCE_CHAR_COLUMNS`` without mutating global state. Internally the
char-inference helpers still read the constant - a full override would
thread the flag through, but the one-knob story here is intentional.
``total_rows`` lets callers who already sampled the frame (e.g. via
:func:`read_sas_preview`) report the real file size in the per-column
"inferred from first N of M rows" note. Falls back to ``len(df)``.
"""
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
# Row-walking type probes run on a bounded head slice; nullability and the
# all-null check still see every row so NOT NULL declarations stay honest.
total_rows = len(df)
if TYPE_INFERENCE_SAMPLE_ROWS is not None and total_rows > TYPE_INFERENCE_SAMPLE_ROWS:
df_rows = len(df)
effective_total = total_rows if total_rows is not None else df_rows
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS)
sampled = True
sample_size = TYPE_INFERENCE_SAMPLE_ROWS
else:
sample_df = df
sampled = False
sample_size = df_rows
sampled = sample_size < effective_total
# Temporarily flip the module-level flag if the caller asked us to.
global COERCE_CHAR_COLUMNS
@ -614,8 +691,8 @@ def infer_schema(
if sampled:
notes.append(
f"type inferred from first {TYPE_INFERENCE_SAMPLE_ROWS:,} of "
f"{total_rows:,} rows"
f"type inferred from first {sample_size:,} of "
f"{effective_total:,} rows"
)
nullable = _is_nullable(series)
@ -627,6 +704,7 @@ def infer_schema(
sas_format=sas_format,
source_dtype=str(series.dtype),
notes=notes,
sampled=sampled,
)
return out
finally:
@ -914,19 +992,31 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
return out
def copy_dataframe(
def copy_dataframes(
conn,
schema_name: str,
table_name: str,
df: pd.DataFrame,
dfs: Iterable[pd.DataFrame],
columns: Dict[str, ColumnSpec],
) -> int:
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
"""Stream an iterable of DataFrames into one ``COPY`` session.
Returns the number of rows inserted.
All chunks share a cursor and transaction, so a failure mid-stream
rolls back the whole load when the caller hasn't committed yet.
Empty chunks are skipped. Returns the total rows inserted.
"""
prepared = _prepare_for_copy(df, columns)
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
sql = (
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
f"FROM STDIN WITH (FORMAT csv, NULL '')"
)
total = 0
with conn.cursor() as cur:
for df in dfs:
if df.empty:
continue
prepared = _prepare_for_copy(df, columns)
buf = io.StringIO()
prepared.to_csv(
buf,
@ -936,18 +1026,24 @@ def copy_dataframe(
date_format="%Y-%m-%d %H:%M:%S",
)
buf.seek(0)
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
sql = (
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
f"FROM STDIN WITH (FORMAT csv, NULL '')"
)
with conn.cursor() as cur:
cur.copy_expert(sql, buf)
rowcount = cur.rowcount
total += len(prepared)
return total
return int(rowcount) if rowcount is not None else len(prepared)
def copy_dataframe(
conn,
schema_name: str,
table_name: str,
df: pd.DataFrame,
columns: Dict[str, ColumnSpec],
) -> int:
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
Convenience wrapper around :func:`copy_dataframes` for single-frame
callers. Returns the number of rows inserted.
"""
return copy_dataframes(conn, schema_name, table_name, [df], columns)
# ---------------------------------------------------------------------------
@ -1060,9 +1156,13 @@ def main(argv: Optional[List[str]] = None) -> int:
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
return 2
df, meta = read_sas(cfg.filename)
df = apply_column_filter(df, cfg.include, cfg.exclude)
columns = infer_schema(df, meta)
# Schema inference uses a bounded preview read so we never load a
# hundreds-of-millions-of-rows file into memory just to pick types.
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
# returned, not the file's total, so we don't trust it here.
preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
columns = infer_schema(preview_df, meta)
if args.validate:
manifest_path = cfg.filename.with_suffix("").with_suffix(".expected.json")
@ -1080,11 +1180,25 @@ def main(argv: Optional[List[str]] = None) -> int:
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
return 0
# Release the preview frame before opening the stream - lets the GC reclaim
# it while we're holding a Postgres transaction open.
del preview_df
def _filtered_chunks():
seen = 0
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
seen += len(chunk_df)
print(f" streaming... {seen:,} rows", file=sys.stderr)
yield chunk_df
conn = connect()
conn.autocommit = False
try:
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
inserted = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns)
inserted = copy_dataframes(
conn, cfg.schemaname, cfg.tablename, _filtered_chunks(), columns
)
conn.commit()
except Exception:
conn.rollback()