From 4f7ded09c60da96c8ad6f41165e59b1e722b88ed Mon Sep 17 00:00:00 2001 From: David Peterson Date: Sat, 18 Apr 2026 10:20:07 -0500 Subject: [PATCH 1/7] Enhance load_sas.py with detailed usage instructions, YAML config structure, and command-line interface documentation for loading SAS files. --- generic_loader/load_sas.py | 165 +++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 4f9a4ca..4956fb3 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -8,6 +8,171 @@ Python 3.9 compatible (target is an air-gapped host that currently only has 3.9). ``from __future__ import annotations`` lets us use PEP 585 generics as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick to ``typing``. + +------------------------------------------------------------------------------- +USAGE +------------------------------------------------------------------------------- + +Supported inputs: + * ``.sas7bdat`` (read with ``encoding="latin-1"``) + * ``.xpt`` / ``.xport`` (SAS transport files) + +1. YAML config +-------------- +Every invocation is driven by a YAML file describing one SAS file to load:: + + filename: samples/sample_kitchensink.xpt # required; relative paths are + # resolved against the config + # file's directory when possible + schemaname: public # required + tablename: kitchensink # required + + # Optional. One of: fail | replace | append. Default: fail. + # fail - error out if the target table already exists + # replace - DROP and recreate the table from the inferred schema + # append - keep the existing table; pre-flight a schema-compat check, + # then COPY the new rows in + if_exists: append + + # Optional, mutually exclusive. Restrict which columns are loaded. + # include: + # - ID + # - INTCOL + # exclude: + # - ALLNULL + +2. Database connection +---------------------- +The loader uses standard libpq environment variables (read via ``os.environ``):: + + PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE + +The CLI calls ``python-dotenv``'s ``load_dotenv()`` at startup, so a local +``.env`` file is picked up automatically. Library callers are responsible for +populating the environment themselves (either call ``load_dotenv()`` or export +the vars) before calling :func:`connect`. + +3. Command-line interface +------------------------- +:: + + python load_sas.py --config path/to/config.yaml [--validate] [--dry-run] + +Flags: + --config PATH Required. Path to the YAML config above. + --validate Compare the inferred schema against + ``.expected.json`` sitting next to the SAS + file. Exits nonzero on mismatch. Safe to combine with + ``--dry-run``. + --dry-run Print the inferred ``CREATE TABLE`` SQL and stop. The + database is never touched (no connection is opened). + +Exit codes: + 0 - success (load completed, or dry-run/validate passed) + 1 - validation failure + 2 - config references a SAS file that does not exist + Other nonzero - uncaught exception (traceback printed); the transaction + is rolled back before exit. + +Typical invocations:: + + # Preview the inferred schema without connecting to Postgres. + python load_sas.py --config sample_config.yaml --dry-run + + # Check the inferred schema against an expected-types manifest. + python load_sas.py --config sample_config.yaml --validate --dry-run + + # Actually load the data. + python load_sas.py --config sample_config.yaml + +4. Expected-types manifest (``--validate``) +------------------------------------------- +``--validate`` looks for a JSON file named ``.expected.json`` next +to the SAS file, e.g. ``samples/sample_kitchensink.xpt`` pairs with +``samples/sample_kitchensink.expected.json``. Each top-level key is a column +name; the value is an object with any of:: + + { + "postgres_type": "BIGINT", # exact expected type, OR + "acceptable_types": ["TEXT", # any-of list of acceptable types + "VARCHAR"], + "nullable": true, # default true; false = must be NOT NULL + "note": "free-form comment" # ignored by the loader + } + +Type comparison ignores length/precision modifiers and normalizes synonyms +(e.g. ``INT`` == ``INTEGER`` == ``INT4``; ``VARCHAR(10)`` == ``VARCHAR``). +Nullability tightening (inferred NULL, manifest NOT NULL) is a hard failure; +loosening is not checked here because the append-mode check already covers it. + +5. Library usage +---------------- +The CLI is a thin wrapper around composable functions. A typical orchestrator +looks like:: + + from dotenv import load_dotenv + from load_sas import ( + load_config, read_sas, apply_column_filter, infer_schema, + validate_against_manifest, render_create_table, + connect, create_table, copy_dataframe, + ) + + load_dotenv() + cfg = load_config("config.yaml") + df, meta = read_sas(cfg.filename) + df = apply_column_filter(df, cfg.include, cfg.exclude) + columns = infer_schema(df, meta) + + # Optional: preview + print(render_create_table(cfg.schemaname, cfg.tablename, columns)) + + # Optional: validate against a manifest + problems = validate_against_manifest(columns, Path("expected.json")) + assert not problems, problems + + conn = connect() + conn.autocommit = False + try: + create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists) + rows = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns) + conn.commit() + finally: + conn.close() + +All functions are side-effect free except :func:`connect`, :func:`create_table`, +and :func:`copy_dataframe`; schema inference (:func:`infer_schema`) accepts a +``coerce_chars`` kwarg to override the module-level ``COERCE_CHAR_COLUMNS`` +without mutating global state. + +6. Type inference summary +------------------------- +Priority order used by :func:`infer_schema`: + + 1. SAS format string (via ``meta.original_variable_types``): + ``DATETIME*`` -> ``TIMESTAMP``, ``TIME*`` -> ``TIME``, + ``DATE*`` / ``YYMMDD*`` / ``MMDDYY*`` / ``DDMMYY*`` / ``JULIAN*`` -> ``DATE``. + 2. All-null column -> ``TEXT`` (with a note). + 3. pandas datetime dtype -> ``TIMESTAMP``. + 4. Object columns containing only ``datetime.date`` / ``datetime.datetime`` + -> ``DATE`` or ``TIMESTAMP``. + 5. Object columns of strings: if ``COERCE_CHAR_COLUMNS`` is True and at + least ``CHAR_INFERENCE_MIN_VALUES`` non-empty values parse cleanly, they + are promoted to ``INTEGER`` / ``BIGINT`` / ``DOUBLE PRECISION`` / + ``DATE`` / ``TIMESTAMP``; otherwise ``TEXT``. + 6. Numeric columns of whole numbers -> ``INTEGER`` (or ``BIGINT`` if any + value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise + ``DOUBLE PRECISION``. + +7. Tunables +----------- +Module-level knobs at the top of this file: + + * ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/ + dates (default True). + * ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before + char-column coercion is attempted. + * ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become + ``BIGINT``. """ from __future__ import annotations From 3a0537270c1607881dfc7224f917e4a647f4748d Mon Sep 17 00:00:00 2001 From: David Peterson Date: Sat, 18 Apr 2026 10:28:37 -0500 Subject: [PATCH 2/7] Implement type inference sampling in load_sas.py to improve performance on large SAS files. Introduce TYPE_INFERENCE_SAMPLE_ROWS to limit the number of rows scanned for type detection while ensuring nullability checks cover the entire column. Update documentation to reflect these changes. --- generic_loader/load_sas.py | 44 ++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 4956fb3..06c5156 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -163,16 +163,26 @@ Priority order used by :func:`infer_schema`: value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise ``DOUBLE PRECISION``. +Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for +performance on large files. Nullability and all-null detection still run over +the full column (they're vectorized and fast) so a ``NOT NULL`` constraint is +never declared for a column that has a null anywhere in the file. Tradeoff: +if the first N rows fit ``INTEGER`` but a later row exceeds int32, COPY will +fail; bump the sample size or set ``TYPE_INFERENCE_SAMPLE_ROWS = None`` to +scan the whole column. + 7. Tunables ----------- Module-level knobs at the top of this file: - * ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/ + * ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/ dates (default True). * ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before char-column coercion is attempted. * ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become ``BIGINT``. + * ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows used for type inference + (``None`` = scan the whole column). """ from __future__ import annotations @@ -210,6 +220,15 @@ values; too small a sample is easy to mis-infer.""" NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647) """INTEGER bounds; anything outside becomes BIGINT.""" +TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000 +"""Cap on rows inspected during per-column type inference. The row-walking +helpers (date detection on object columns, string-coercion probes, whole-number +check on numeric columns) operate on ``df.head(TYPE_INFERENCE_SAMPLE_ROWS)`` +instead of the full frame, which matters on SAS files with hundreds of millions +of rows. Nullability is still evaluated across the whole column (cheap, +vectorized) so ``NOT NULL`` declarations remain safe. Set to ``None`` to scan +every row.""" + VALID_IF_EXISTS = ("fail", "replace", "append") @@ -547,6 +566,16 @@ def infer_schema( """ original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) + # Row-walking type probes run on a bounded head slice; nullability and the + # all-null check still see every row so NOT NULL declarations stay honest. + total_rows = len(df) + if TYPE_INFERENCE_SAMPLE_ROWS is not None and total_rows > TYPE_INFERENCE_SAMPLE_ROWS: + sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS) + sampled = True + else: + sample_df = df + sampled = False + # Temporarily flip the module-level flag if the caller asked us to. global COERCE_CHAR_COLUMNS saved = COERCE_CHAR_COLUMNS @@ -555,6 +584,7 @@ def infer_schema( out: Dict[str, ColumnSpec] = {} for col in df.columns: series = df[col] + sample_series = sample_df[col] sas_format = original_formats.get(col) notes: List[str] = [] @@ -567,13 +597,13 @@ def infer_schema( elif pd.api.types.is_datetime64_any_dtype(series): pg_type = "TIMESTAMP" elif pd.api.types.is_object_dtype(series): - is_dates, any_dt = _object_is_dates(series) + is_dates, any_dt = _object_is_dates(sample_series) if is_dates: pg_type = "TIMESTAMP" if any_dt else "DATE" else: - pg_type = _infer_char_type(series) + pg_type = _infer_char_type(sample_series) elif pd.api.types.is_numeric_dtype(series): - int_target = _numeric_int_target(series) + int_target = _numeric_int_target(sample_series) if int_target is not None: pg_type = int_target else: @@ -582,6 +612,12 @@ def infer_schema( pg_type = "TEXT" notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT") + if sampled: + notes.append( + f"type inferred from first {TYPE_INFERENCE_SAMPLE_ROWS:,} of " + f"{total_rows:,} rows" + ) + nullable = _is_nullable(series) out[col] = ColumnSpec( From 5645ff5597dc101671b999aaad200fe63ba818ff Mon Sep 17 00:00:00 2001 From: David Peterson Date: Sat, 18 Apr 2026 10:44:32 -0500 Subject: [PATCH 3/7] Update load_sas.py to support streaming data loads with iter_sas_chunks and copy_dataframes. Enhance documentation for schema inference and type detection, clarifying the use of read_sas_preview and the implications of sampling. Add __pycache__ to .gitignore. --- generic_loader/.gitignore | 1 + generic_loader/load_sas.py | 268 ++++++++++++++++++++++++++----------- 2 files changed, 192 insertions(+), 77 deletions(-) diff --git a/generic_loader/.gitignore b/generic_loader/.gitignore index c93b13d..00d5b98 100644 --- a/generic_loader/.gitignore +++ b/generic_loader/.gitignore @@ -1,3 +1,4 @@ /.venv /samples /.env +/__pycache__ \ No newline at end of file diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 06c5156..78d00c4 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -107,26 +107,29 @@ loosening is not checked here because the append-mode check already covers it. 5. Library usage ---------------- -The CLI is a thin wrapper around composable functions. A typical orchestrator -looks like:: +The CLI is a thin wrapper around composable functions. The preferred pattern +infers the schema from a bounded preview and then streams the rest of the +file chunk-by-chunk into ``COPY`` - crucial for SAS files with hundreds of +millions of rows:: from dotenv import load_dotenv from load_sas import ( - load_config, read_sas, apply_column_filter, infer_schema, - validate_against_manifest, render_create_table, - connect, create_table, copy_dataframe, + load_config, read_sas_preview, iter_sas_chunks, apply_column_filter, + infer_schema, validate_against_manifest, render_create_table, + connect, create_table, copy_dataframes, ) load_dotenv() cfg = load_config("config.yaml") - df, meta = read_sas(cfg.filename) - df = apply_column_filter(df, cfg.include, cfg.exclude) - columns = infer_schema(df, meta) - # Optional: preview + # Schema from a preview slice (bounded by TYPE_INFERENCE_SAMPLE_ROWS). + preview_df, meta = read_sas_preview(cfg.filename) + preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) + total_rows = getattr(meta, "number_rows", None) + columns = infer_schema(preview_df, meta, total_rows=total_rows) + + # Optional: preview DDL / validate against a manifest. print(render_create_table(cfg.schemaname, cfg.tablename, columns)) - - # Optional: validate against a manifest problems = validate_against_manifest(columns, Path("expected.json")) assert not problems, problems @@ -134,15 +137,23 @@ looks like:: conn.autocommit = False try: create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists) - rows = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns) + chunks = ( + apply_column_filter(df, cfg.include, cfg.exclude) + for df, _ in iter_sas_chunks(cfg.filename) + ) + rows = copy_dataframes(conn, cfg.schemaname, cfg.tablename, chunks, columns) conn.commit() finally: conn.close() +For small files (or tests) the legacy one-shot API still works: +:func:`read_sas` returns the whole frame and :func:`copy_dataframe` copies it +in one round trip. + All functions are side-effect free except :func:`connect`, :func:`create_table`, -and :func:`copy_dataframe`; schema inference (:func:`infer_schema`) accepts a -``coerce_chars`` kwarg to override the module-level ``COERCE_CHAR_COLUMNS`` -without mutating global state. +:func:`copy_dataframe`, and :func:`copy_dataframes`; schema inference +(:func:`infer_schema`) accepts a ``coerce_chars`` kwarg to override the +module-level ``COERCE_CHAR_COLUMNS`` without mutating global state. 6. Type inference summary ------------------------- @@ -164,25 +175,32 @@ Priority order used by :func:`infer_schema`: ``DOUBLE PRECISION``. Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for -performance on large files. Nullability and all-null detection still run over -the full column (they're vectorized and fast) so a ``NOT NULL`` constraint is -never declared for a column that has a null anywhere in the file. Tradeoff: -if the first N rows fit ``INTEGER`` but a later row exceeds int32, COPY will -fail; bump the sample size or set ``TYPE_INFERENCE_SAMPLE_ROWS = None`` to -scan the whole column. +performance on large files. The CLI enforces this at read time via +:func:`read_sas_preview`, so the whole file is never materialized just to pick +types. Sampled specs carry an ``inferred_from_sample`` marker and the usual +tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32, +or a column had no nulls in the preview but does later in the file, ``COPY`` +will fail mid-stream and the whole transaction rolls back. Set +``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing +matters more than speed. + +Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which +share one cursor and transaction so a failure mid-file rolls back the whole +load. 7. Tunables ----------- Module-level knobs at the top of this file: - * ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/ - dates (default True). - * ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before - char-column coercion is attempted. - * ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become - ``BIGINT``. - * ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows used for type inference - (``None`` = scan the whole column). + * ``COERCE_CHAR_COLUMNS`` - promote stringly-typed numerics / dates + (default True). + * ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before + char-column coercion is attempted. + * ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become + ``BIGINT``. + * ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows read for type inference + (``None`` = scan the whole column). + * ``DEFAULT_CHUNK_ROWS`` - rows per streaming COPY chunk. """ from __future__ import annotations @@ -195,7 +213,7 @@ import os import sys from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple import pandas as pd import psycopg2 @@ -221,13 +239,16 @@ NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647) """INTEGER bounds; anything outside becomes BIGINT.""" TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000 -"""Cap on rows inspected during per-column type inference. The row-walking -helpers (date detection on object columns, string-coercion probes, whole-number -check on numeric columns) operate on ``df.head(TYPE_INFERENCE_SAMPLE_ROWS)`` -instead of the full frame, which matters on SAS files with hundreds of millions -of rows. Nullability is still evaluated across the whole column (cheap, -vectorized) so ``NOT NULL`` declarations remain safe. Set to ``None`` to scan -every row.""" +"""Cap on rows inspected during per-column type inference. Also governs how +many rows :func:`read_sas_preview` pulls from the file for dry-run / validate / +schema-inference flows. Set to ``None`` to scan every row (and read the whole +file into memory for the preview step - don't do this on multi-hundred-million +row files).""" + +DEFAULT_CHUNK_ROWS = 100_000 +"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean +fewer COPY round-trips but more peak memory per chunk; smaller values are +gentler on memory.""" VALID_IF_EXISTS = ("fail", "replace", "append") @@ -256,6 +277,12 @@ class ColumnSpec: sas_format: Optional[str] = None source_dtype: Optional[str] = None notes: List[str] = field(default_factory=list) + sampled: bool = False + """True when the type was inferred from a bounded preview rather than the + full file. A sampled spec carries the usual sampling risks: a later chunk + could contain a value that exceeds the inferred integer range, doesn't + parse as the inferred type, or is null in a column the preview showed as + non-null - all of which surface as mid-``COPY`` failures.""" # --------------------------------------------------------------------------- @@ -355,8 +382,8 @@ def load_config(path: Path) -> LoaderConfig: # --------------------------------------------------------------------------- -def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]: - """Dispatch to the right pyreadstat reader by extension. +def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]: + """Return ``(pyreadstat_reader, extra_kwargs)`` for ``path``. Invariants (learned the hard way while building the sample generator): @@ -364,15 +391,58 @@ def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]: encoding on XPORT files it wrote itself. * ``.sas7bdat`` - explicit ``encoding="latin-1"`` per colleague guidance. """ - path = Path(path) - suffix = path.suffix.lower() + suffix = Path(path).suffix.lower() if suffix in (".xpt", ".xport"): - return pyreadstat.read_xport(str(path)) + return pyreadstat.read_xport, {} if suffix == ".sas7bdat": - return pyreadstat.read_sas7bdat(str(path), encoding="latin-1") + return pyreadstat.read_sas7bdat, {"encoding": "latin-1"} raise ValueError(f"Unsupported SAS file extension: {suffix}") +def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]: + """Read an entire SAS file into memory. Only safe for small files. + + Kept for backward compatibility and tests; the CLI now uses + :func:`read_sas_preview` + :func:`iter_sas_chunks` so it never materializes + the whole frame at once. + """ + reader, kwargs = _sas_reader(path) + return reader(str(Path(path)), **kwargs) + + +def read_sas_preview( + path: Path, + *, + rows: Optional[int] = None, +) -> Tuple[pd.DataFrame, Any]: + """Read the first ``rows`` records from ``path`` plus its metadata. + + Defaults to ``TYPE_INFERENCE_SAMPLE_ROWS`` when ``rows`` is not given. + Passing ``rows=None`` with ``TYPE_INFERENCE_SAMPLE_ROWS=None`` reads the + whole file (pyreadstat treats ``row_limit=0`` as unlimited). + """ + reader, kwargs = _sas_reader(path) + effective = rows if rows is not None else TYPE_INFERENCE_SAMPLE_ROWS + row_limit = int(effective) if effective else 0 + return reader(str(Path(path)), row_limit=row_limit, **kwargs) + + +def iter_sas_chunks( + path: Path, + *, + chunksize: int = DEFAULT_CHUNK_ROWS, +): + """Yield ``(df_chunk, meta)`` tuples for streaming loads. + + Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right + underlying reader by extension and threads through our encoding defaults. + """ + reader, kwargs = _sas_reader(path) + yield from pyreadstat.read_file_in_chunks( + reader, str(Path(path)), chunksize=chunksize, **kwargs + ) + + # --------------------------------------------------------------------------- # Column filtering # --------------------------------------------------------------------------- @@ -552,6 +622,7 @@ def infer_schema( meta: Any, *, coerce_chars: bool = COERCE_CHAR_COLUMNS, + total_rows: Optional[int] = None, ) -> Dict[str, ColumnSpec]: """Infer a Postgres column spec for each column in ``df``. @@ -563,18 +634,24 @@ def infer_schema( ``COERCE_CHAR_COLUMNS`` without mutating global state. Internally the char-inference helpers still read the constant - a full override would thread the flag through, but the one-knob story here is intentional. + + ``total_rows`` lets callers who already sampled the frame (e.g. via + :func:`read_sas_preview`) report the real file size in the per-column + "inferred from first N of M rows" note. Falls back to ``len(df)``. """ original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {}) # Row-walking type probes run on a bounded head slice; nullability and the # all-null check still see every row so NOT NULL declarations stay honest. - total_rows = len(df) - if TYPE_INFERENCE_SAMPLE_ROWS is not None and total_rows > TYPE_INFERENCE_SAMPLE_ROWS: + df_rows = len(df) + effective_total = total_rows if total_rows is not None else df_rows + if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS: sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS) - sampled = True + sample_size = TYPE_INFERENCE_SAMPLE_ROWS else: sample_df = df - sampled = False + sample_size = df_rows + sampled = sample_size < effective_total # Temporarily flip the module-level flag if the caller asked us to. global COERCE_CHAR_COLUMNS @@ -614,8 +691,8 @@ def infer_schema( if sampled: notes.append( - f"type inferred from first {TYPE_INFERENCE_SAMPLE_ROWS:,} of " - f"{total_rows:,} rows" + f"type inferred from first {sample_size:,} of " + f"{effective_total:,} rows" ) nullable = _is_nullable(series) @@ -627,6 +704,7 @@ def infer_schema( sas_format=sas_format, source_dtype=str(series.dtype), notes=notes, + sampled=sampled, ) return out finally: @@ -914,6 +992,45 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da return out +def copy_dataframes( + conn, + schema_name: str, + table_name: str, + dfs: Iterable[pd.DataFrame], + columns: Dict[str, ColumnSpec], +) -> int: + """Stream an iterable of DataFrames into one ``COPY`` session. + + All chunks share a cursor and transaction, so a failure mid-stream + rolls back the whole load when the caller hasn't committed yet. + Empty chunks are skipped. Returns the total rows inserted. + """ + col_list = ", ".join(_quote_ident(name) for name in columns.keys()) + sql = ( + f"COPY {_qualified(schema_name, table_name)} ({col_list}) " + f"FROM STDIN WITH (FORMAT csv, NULL '')" + ) + + total = 0 + with conn.cursor() as cur: + for df in dfs: + if df.empty: + continue + prepared = _prepare_for_copy(df, columns) + buf = io.StringIO() + prepared.to_csv( + buf, + index=False, + header=False, + na_rep="", + date_format="%Y-%m-%d %H:%M:%S", + ) + buf.seek(0) + cur.copy_expert(sql, buf) + total += len(prepared) + return total + + def copy_dataframe( conn, schema_name: str, @@ -923,31 +1040,10 @@ def copy_dataframe( ) -> int: """Stream ``df`` into Postgres via ``COPY ... FROM STDIN``. - Returns the number of rows inserted. + Convenience wrapper around :func:`copy_dataframes` for single-frame + callers. Returns the number of rows inserted. """ - prepared = _prepare_for_copy(df, columns) - - buf = io.StringIO() - prepared.to_csv( - buf, - index=False, - header=False, - na_rep="", - date_format="%Y-%m-%d %H:%M:%S", - ) - buf.seek(0) - - col_list = ", ".join(_quote_ident(name) for name in columns.keys()) - sql = ( - f"COPY {_qualified(schema_name, table_name)} ({col_list}) " - f"FROM STDIN WITH (FORMAT csv, NULL '')" - ) - - with conn.cursor() as cur: - cur.copy_expert(sql, buf) - rowcount = cur.rowcount - - return int(rowcount) if rowcount is not None else len(prepared) + return copy_dataframes(conn, schema_name, table_name, [df], columns) # --------------------------------------------------------------------------- @@ -1060,9 +1156,13 @@ def main(argv: Optional[List[str]] = None) -> int: print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr) return 2 - df, meta = read_sas(cfg.filename) - df = apply_column_filter(df, cfg.include, cfg.exclude) - columns = infer_schema(df, meta) + # Schema inference uses a bounded preview read so we never load a + # hundreds-of-millions-of-rows file into memory just to pick types. + # NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows + # returned, not the file's total, so we don't trust it here. + preview_df, meta = read_sas_preview(cfg.filename) + preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) + columns = infer_schema(preview_df, meta) if args.validate: manifest_path = cfg.filename.with_suffix("").with_suffix(".expected.json") @@ -1080,11 +1180,25 @@ def main(argv: Optional[List[str]] = None) -> int: print(render_create_table(cfg.schemaname, cfg.tablename, columns)) return 0 + # Release the preview frame before opening the stream - lets the GC reclaim + # it while we're holding a Postgres transaction open. + del preview_df + + def _filtered_chunks(): + seen = 0 + for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename): + chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude) + seen += len(chunk_df) + print(f" streaming... {seen:,} rows", file=sys.stderr) + yield chunk_df + conn = connect() conn.autocommit = False try: create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists) - inserted = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns) + inserted = copy_dataframes( + conn, cfg.schemaname, cfg.tablename, _filtered_chunks(), columns + ) conn.commit() except Exception: conn.rollback() From 5b48872dd743e5e9bdfe94f58e82822c7d06e8d2 Mon Sep 17 00:00:00 2001 From: David Peterson Date: Sat, 18 Apr 2026 11:25:04 -0500 Subject: [PATCH 4/7] Add generate_sample_folder.py and load_folder.py for clustered SAS file generation and loading Introduce generate_sample_folder.py to create a test folder with clustered SAS XPORT files, including configurations for schema compatibility checks. Implement load_folder.py to facilitate loading entire directories of SAS files into Postgres, supporting explicit and auto-detect clustering. Update sample_folder_config.yaml for usage examples and configuration structure. Enhance load_sas.py with a public schema compatibility check function for orchestrators. --- generic_loader/generate_sample_folder.py | 185 ++++++++ generic_loader/load_folder.py | 555 +++++++++++++++++++++++ generic_loader/load_sas.py | 16 + generic_loader/sample_folder_config.yaml | 54 +++ 4 files changed, 810 insertions(+) create mode 100644 generic_loader/generate_sample_folder.py create mode 100644 generic_loader/load_folder.py create mode 100644 generic_loader/sample_folder_config.yaml diff --git a/generic_loader/generate_sample_folder.py b/generic_loader/generate_sample_folder.py new file mode 100644 index 0000000..d1fd932 --- /dev/null +++ b/generic_loader/generate_sample_folder.py @@ -0,0 +1,185 @@ +"""Generate a folder of clustered SAS XPORT files for testing ``load_folder``. + +Produces ``samples/folder_test/`` containing three clusters: + +* ``group_a{1,2,3}.xpt`` - kitchen-sink schema (every column). +* ``group_b{1,2}.xpt`` - a *different* schema (drops ``BIGINT`` and + ``TIMECOL``) so a schema-compat check would catch cross-cluster + contamination if the regex were wrong. +* ``standalone.xpt`` - singleton to exercise the no-cluster / singleton + auto-detect path. + +Alongside the files, writes ``sample_folder_config.yaml`` that exercises +both code paths: ``group_a*`` via an explicit regex pattern, ``group_b*`` +and ``standalone`` via auto-detect. + +Finally, runs :func:`load_folder.discover_clusters` against the generated +folder and asserts the grouping is what we expect. This is a pure in-process +smoke test of the clustering logic; no Postgres connection is required. + +Reuses ``generate_sample_sas.build_dataframe`` so data shape / dtypes match +the single-file loader tests. ``N_ROWS`` is temporarily shrunk on the +imported module for this run's duration so repeated invocations stay fast. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import numpy as np +import pandas as pd +import pyreadstat +import yaml + +import generate_sample_sas as gss +from load_folder import discover_clusters, load_folder_config + + +FIXTURE_ROWS = 2_000 +OUT_DIR = Path("samples/folder_test") +CONFIG_PATH = OUT_DIR / "folder_config.yaml" + +GROUP_A_FILES = ["group_a1.xpt", "group_a2.xpt", "group_a3.xpt"] +GROUP_B_FILES = ["group_b1.xpt", "group_b2.xpt"] +STANDALONE_FILE = "standalone.xpt" + +# Columns dropped from the group_b cluster so it has a genuinely different +# schema from the group_a cluster. If the regex accidentally pulled a group_b file +# into the group_a cluster (or vice versa), load_cluster's schema-compat check +# would fire on these differences. +GROUP_B_DROPPED_COLUMNS = ("BIGINT", "TIMECOL") + + +def _build_df(seed: int) -> pd.DataFrame: + """Build a kitchen-sink DataFrame via the existing generator. + + Temporarily shrinks ``generate_sample_sas.N_ROWS`` so each fixture file + is small enough to regenerate quickly. Restored afterward so importing + this module alongside the main generator stays side-effect free. + """ + saved = gss.N_ROWS + gss.N_ROWS = FIXTURE_ROWS + try: + rng = np.random.default_rng(seed) + return gss.build_dataframe(rng) + finally: + gss.N_ROWS = saved + + +def _write_xport(df: pd.DataFrame, path: Path, table_name: str) -> None: + # Only pass variable_format entries for columns that actually exist in + # this frame - write_xport errors on formats referencing missing cols. + variable_format = { + k: v for k, v in gss.VARIABLE_FORMATS.items() if k in df.columns + } + column_labels = {k: v for k, v in gss.COLUMN_LABELS.items() if k in df.columns} + + pyreadstat.write_xport( + df, + str(path), + file_format_version=5, + table_name=table_name, + file_label=f"Folder-loader fixture ({path.name})", + column_labels=column_labels, + variable_format=variable_format, + ) + + +def generate_fixtures() -> None: + OUT_DIR.mkdir(parents=True, exist_ok=True) + + for i, name in enumerate(GROUP_A_FILES): + df = _build_df(seed=100 + i) + _write_xport(df, OUT_DIR / name, table_name=f"GRPA{i + 1}") + print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)") + + for i, name in enumerate(GROUP_B_FILES): + df = _build_df(seed=200 + i) + df = df.drop(columns=list(GROUP_B_DROPPED_COLUMNS)) + _write_xport(df, OUT_DIR / name, table_name=f"GRPB{i + 1}") + print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)") + + df = _build_df(seed=300) + _write_xport(df, OUT_DIR / STANDALONE_FILE, table_name="STDALONE") + print( + f" wrote {OUT_DIR / STANDALONE_FILE} " + f"({len(df):,} rows, {len(df.columns)} cols)" + ) + + +def write_config() -> None: + cfg = { + "folder": ".", # config lives inside the target folder + "schemaname": "public", + "if_exists": "replace", + "auto_detect": True, + "clusters": [ + { + "pattern": r"^group_a\d+\.xpt$", + "tablename": "group_a", + }, + ], + } + with CONFIG_PATH.open("w", encoding="utf-8") as f: + # Top-of-file comment documents the intent of this generated config. + f.write( + "# Generated by generate_sample_folder.py. Demonstrates both\n" + "# explicit regex clustering (group_a*) and auto-detect\n" + "# (group_b* and standalone) working together.\n" + ) + yaml.safe_dump(cfg, f, sort_keys=False) + print(f" wrote {CONFIG_PATH}") + + +def verify() -> None: + """Smoke-test the clustering logic against the generated folder.""" + cfg = load_folder_config(CONFIG_PATH) + clusters = discover_clusters(cfg) + + by_name = {c.tablename: c for c in clusters} + + expected_names = {"group_a", "group_b", "standalone"} + actual_names = set(by_name) + assert expected_names == actual_names, ( + f"cluster set mismatch: expected {expected_names}, got {actual_names}" + ) + + group_a = by_name["group_a"] + assert group_a.source == "explicit", f"group_a source = {group_a.source!r}" + assert [f.name for f in group_a.files] == sorted(GROUP_A_FILES), ( + f"group_a files = {[f.name for f in group_a.files]}" + ) + + group_b = by_name["group_b"] + assert group_b.source == "auto", f"group_b source = {group_b.source!r}" + assert [f.name for f in group_b.files] == sorted(GROUP_B_FILES), ( + f"group_b files = {[f.name for f in group_b.files]}" + ) + + standalone = by_name["standalone"] + assert standalone.source == "auto", f"standalone source = {standalone.source!r}" + assert [f.name for f in standalone.files] == [STANDALONE_FILE], ( + f"standalone files = {[f.name for f in standalone.files]}" + ) + + print(" clustering verified:") + for c in clusters: + files = ", ".join(f.name for f in c.files) + print(f" {c.tablename} [{c.source}]: {files}") + + +def main() -> int: + print(f"Writing fixture SAS files to {OUT_DIR}/") + generate_fixtures() + print(f"\nWriting folder config to {CONFIG_PATH}") + write_config() + print("\nVerifying discover_clusters() grouping...") + verify() + print("\nOK. Try:") + print(f" python load_folder.py --config {CONFIG_PATH} --dry-run") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py new file mode 100644 index 0000000..baecba9 --- /dev/null +++ b/generic_loader/load_folder.py @@ -0,0 +1,555 @@ +"""Folder-level SAS-to-Postgres loader. + +Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in +one invocation. A directory often contains several *clusters* of files that +share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each +cluster becomes one Postgres table; files inside a cluster are appended to it. + +------------------------------------------------------------------------------- +USAGE +------------------------------------------------------------------------------- + +1. YAML config +-------------- +:: + + folder: samples/folder_test # required; relative paths resolve against + # the config file's directory + schemaname: public # required + + # Optional. One of: fail | replace | append. Default: fail. + # Applied to the first file of each cluster (subsequent files in the + # cluster always run through the append-mode compatibility check). + if_exists: fail + + # Optional. Default: true. When true, files that don't match any explicit + # pattern below are grouped by their common prefix (trailing digits, and + # optional trailing separators, are stripped from each file stem). + auto_detect: true + + # Optional. Columns to force-include or force-exclude across every file. + # include and exclude are mutually exclusive. + # include: [ID, INTCOL] + # exclude: [ALLNULL] + + # Optional explicit cluster patterns. Each pattern is matched against the + # file *basename*. Matched files are pulled out of the auto-detect pool. + # Per-cluster if_exists/include/exclude override the folder-level defaults. + clusters: + - pattern: '^group_a\\d+\\.sas7bdat$' + tablename: group_a + - pattern: '^group_b\\d+\\.sas7bdat$' + tablename: group_b + if_exists: replace + +2. Command-line interface +------------------------- +:: + + python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast] + +Flags: + --config PATH Required. Path to the YAML config above. + --dry-run Print the discovered clusters and the inferred CREATE + TABLE for each (schema from the first file of the + cluster). The database is never touched. + --fail-fast Abort the whole run on the first cluster failure. + Default is to log the failure, roll that cluster back, + and keep going. + +Exit codes: + 0 - every cluster loaded successfully (or dry-run completed) + 1 - at least one cluster failed (details on stderr) + 2 - folder does not exist / contains no SAS files + +3. Discovery rules +------------------ +* Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches + :mod:`load_sas`). The folder is not scanned recursively. +* Explicit patterns are tried in order. A file matched by one pattern is + removed from the pool before the next pattern runs, so earlier patterns + win in case of overlap. Overlap between patterns is flagged as an error + at config-parse time (a file matching two patterns is almost always a bug). +* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with + any trailing ``_`` / ``-`` stripped afterward. Stems without trailing + digits become singleton clusters named after the stem. + +4. Library usage +---------------- +:: + + from load_folder import load_folder_config, discover_clusters, load_cluster + from load_sas import connect + + cfg = load_folder_config("folder_config.yaml") + clusters = discover_clusters(cfg) + + conn = connect() + try: + for cluster in clusters: + load_cluster(conn, cluster, cfg.schemaname) + finally: + conn.close() +""" + +from __future__ import annotations + +import argparse +import re +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml +from dotenv import load_dotenv + +from load_sas import ( + VALID_IF_EXISTS, + apply_column_filter, + assert_schema_compatible, + connect, + copy_dataframes, + create_table, + infer_schema, + iter_sas_chunks, + read_sas_preview, + render_create_table, +) + + +SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport") + + +# --------------------------------------------------------------------------- +# Dataclasses +# --------------------------------------------------------------------------- + + +@dataclass +class ClusterSpec: + tablename: str + files: List[Path] + if_exists: str + include: Optional[List[str]] + exclude: Optional[List[str]] + source: str # "explicit" or "auto" + pattern: Optional[str] = None + + +@dataclass +class _ExplicitPattern: + """Parsed form of a single ``clusters[*]`` YAML entry.""" + + pattern: re.Pattern + raw_pattern: str + tablename: str + if_exists: Optional[str] = None + include: Optional[List[str]] = None + exclude: Optional[List[str]] = None + + +@dataclass +class FolderConfig: + folder: Path + schemaname: str + if_exists: str = "fail" + auto_detect: bool = True + include: Optional[List[str]] = None + exclude: Optional[List[str]] = None + explicit: List[_ExplicitPattern] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Config loading +# --------------------------------------------------------------------------- + + +def _validate_if_exists(value: Any, where: str) -> str: + s = str(value).lower() + if s not in VALID_IF_EXISTS: + raise ValueError( + f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}" + ) + return s + + +def _parse_columns_filter( + raw: Dict[str, Any], where: str +) -> Tuple[Optional[List[str]], Optional[List[str]]]: + include = raw.get("include") + exclude = raw.get("exclude") + if include is not None and exclude is not None: + raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.") + if include is not None and not isinstance(include, list): + raise ValueError(f"{where}: 'include' must be a list of column names.") + if exclude is not None and not isinstance(exclude, list): + raise ValueError(f"{where}: 'exclude' must be a list of column names.") + include_out = [str(c) for c in include] if include is not None else None + exclude_out = [str(c) for c in exclude] if exclude is not None else None + return include_out, exclude_out + + +def load_folder_config(path: Path) -> FolderConfig: + """Parse and validate the folder-level YAML config at ``path``.""" + path = Path(path) + with path.open("r", encoding="utf-8") as f: + raw = yaml.safe_load(f) + + if not isinstance(raw, dict): + raise ValueError(f"Config at {path} must be a YAML mapping at the top level.") + + missing = [k for k in ("folder", "schemaname") if k not in raw] + if missing: + raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}") + + folder = Path(raw["folder"]) + if not folder.is_absolute(): + candidate = (path.parent / folder).resolve() + folder = candidate if candidate.exists() else folder + + schemaname = str(raw["schemaname"]) + if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}") + auto_detect = bool(raw.get("auto_detect", True)) + + include, exclude = _parse_columns_filter(raw, f"Config {path}") + + explicit: List[_ExplicitPattern] = [] + clusters_raw = raw.get("clusters") or [] + if not isinstance(clusters_raw, list): + raise ValueError(f"Config {path}: 'clusters' must be a list if present.") + for i, entry in enumerate(clusters_raw): + where = f"Config {path} clusters[{i}]" + if not isinstance(entry, dict): + raise ValueError(f"{where} must be a mapping.") + if "pattern" not in entry or "tablename" not in entry: + raise ValueError(f"{where} must include 'pattern' and 'tablename'.") + raw_pat = str(entry["pattern"]) + try: + compiled = re.compile(raw_pat) + except re.error as e: + raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e + c_if_exists = ( + _validate_if_exists(entry["if_exists"], where) + if "if_exists" in entry + else None + ) + c_include, c_exclude = _parse_columns_filter(entry, where) + explicit.append( + _ExplicitPattern( + pattern=compiled, + raw_pattern=raw_pat, + tablename=str(entry["tablename"]), + if_exists=c_if_exists, + include=c_include, + exclude=c_exclude, + ) + ) + + return FolderConfig( + folder=folder, + schemaname=schemaname, + if_exists=if_exists, + auto_detect=auto_detect, + include=include, + exclude=exclude, + explicit=explicit, + ) + + +# --------------------------------------------------------------------------- +# Cluster discovery +# --------------------------------------------------------------------------- + + +_TRAILING_DIGIT_RE = re.compile(r"\d+$") + + +def _auto_prefix(stem: str) -> str: + """Derive the cluster key for a file stem. + + Strip trailing digits and any trailing separators so + ``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same + ``group_a`` bucket. If nothing is stripped, the stem is its own key. + """ + stripped = _TRAILING_DIGIT_RE.sub("", stem) + stripped = stripped.rstrip("_-") + return stripped or stem + + +def _list_sas_files(folder: Path) -> List[Path]: + files: List[Path] = [] + for p in sorted(folder.iterdir()): + if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS: + files.append(p) + return files + + +def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: + """Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects. + + Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No + SAS file is opened here. Explicit patterns are applied first, in config + order; files matched by an earlier pattern are removed from the pool + before the next pattern runs. A file matching two patterns triggers a + hard error (that's almost always a config bug). + """ + if not cfg.folder.exists() or not cfg.folder.is_dir(): + raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}") + + pool = _list_sas_files(cfg.folder) + clusters: List[ClusterSpec] = [] + + # Detect cross-pattern overlap up front for a clearer error message. + for i, p_i in enumerate(cfg.explicit): + for j in range(i + 1, len(cfg.explicit)): + p_j = cfg.explicit[j] + for f in pool: + if p_i.pattern.search(f.name) and p_j.pattern.search(f.name): + raise ValueError( + f"File {f.name!r} matches multiple explicit patterns: " + f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}" + ) + + remaining = list(pool) + for patt in cfg.explicit: + matched = [f for f in remaining if patt.pattern.search(f.name)] + if not matched: + # Not an error - the folder might legitimately not contain files + # for this pattern on a given run. Emit a note for the CLI. + clusters.append( + ClusterSpec( + tablename=patt.tablename, + files=[], + if_exists=patt.if_exists or cfg.if_exists, + include=patt.include if patt.include is not None else cfg.include, + exclude=patt.exclude if patt.exclude is not None else cfg.exclude, + source="explicit", + pattern=patt.raw_pattern, + ) + ) + continue + remaining = [f for f in remaining if f not in matched] + clusters.append( + ClusterSpec( + tablename=patt.tablename, + files=sorted(matched), + if_exists=patt.if_exists or cfg.if_exists, + include=patt.include if patt.include is not None else cfg.include, + exclude=patt.exclude if patt.exclude is not None else cfg.exclude, + source="explicit", + pattern=patt.raw_pattern, + ) + ) + + if cfg.auto_detect and remaining: + buckets: Dict[str, List[Path]] = {} + for f in remaining: + key = _auto_prefix(f.stem) + buckets.setdefault(key, []).append(f) + for key in sorted(buckets): + clusters.append( + ClusterSpec( + tablename=key, + files=sorted(buckets[key]), + if_exists=cfg.if_exists, + include=cfg.include, + exclude=cfg.exclude, + source="auto", + ) + ) + + return clusters + + +# --------------------------------------------------------------------------- +# Per-cluster load +# --------------------------------------------------------------------------- + + +def _infer_cluster_schema(path: Path, include, exclude): + preview_df, meta = read_sas_preview(path) + preview_df = apply_column_filter(preview_df, include, exclude) + total_rows = getattr(meta, "number_rows", None) + columns = infer_schema(preview_df, meta, total_rows=total_rows) + return columns + + +def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int: + """Load every file in ``cluster`` into one table. Returns total rows loaded. + + The caller owns transaction boundaries. This function does NOT commit or + roll back - :func:`main` does that per cluster so one bad cluster + doesn't poison the rest of the run. + """ + if not cluster.files: + return 0 + + first, *rest = cluster.files + first_columns = _infer_cluster_schema(first, cluster.include, cluster.exclude) + create_table( + conn, schemaname, cluster.tablename, first_columns, cluster.if_exists + ) + + total = 0 + total += _stream_file( + conn, schemaname, cluster.tablename, first, first_columns, + cluster.include, cluster.exclude, + ) + + for path in rest: + columns = _infer_cluster_schema(path, cluster.include, cluster.exclude) + # Uses the same check that if_exists=append runs. A type mismatch or + # missing column aborts the cluster; the transaction rollback in + # main() keeps the table from ending up half-loaded. + assert_schema_compatible(conn, schemaname, cluster.tablename, columns) + total += _stream_file( + conn, schemaname, cluster.tablename, path, columns, + cluster.include, cluster.exclude, + ) + + return total + + +def _stream_file( + conn, + schemaname: str, + tablename: str, + path: Path, + columns, + include, + exclude, +) -> int: + def _chunks(): + seen = 0 + for chunk_df, _chunk_meta in iter_sas_chunks(path): + chunk_df = apply_column_filter(chunk_df, include, exclude) + seen += len(chunk_df) + print( + f" {path.name}: streaming... {seen:,} rows", + file=sys.stderr, + ) + yield chunk_df + + return copy_dataframes(conn, schemaname, tablename, _chunks(), columns) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _build_argparser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description=( + "Load every SAS file in a folder into Postgres, grouping files " + "into clusters that each become one table." + ), + ) + p.add_argument("--config", required=True, type=Path, help="Path to YAML config") + p.add_argument( + "--dry-run", + action="store_true", + help=( + "Print discovered clusters and the inferred CREATE TABLE for " + "each; don't touch Postgres." + ), + ) + p.add_argument( + "--fail-fast", + action="store_true", + help=( + "Abort on the first cluster failure. Default is to roll that " + "cluster back and continue with the next one." + ), + ) + return p + + +def _describe_cluster(cluster: ClusterSpec) -> str: + src = f"{cluster.source}" + if cluster.pattern: + src += f" pattern={cluster.pattern!r}" + files = ", ".join(f.name for f in cluster.files) or "(no matching files)" + return ( + f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n" + f" files: {files}" + ) + + +def main(argv: Optional[List[str]] = None) -> int: + args = _build_argparser().parse_args(argv) + + load_dotenv() + + cfg = load_folder_config(args.config) + + if not cfg.folder.exists() or not cfg.folder.is_dir(): + print(f"error: folder not found: {cfg.folder}", file=sys.stderr) + return 2 + + clusters = discover_clusters(cfg) + loadable = [c for c in clusters if c.files] + + if not loadable: + print( + f"error: no SAS files found in {cfg.folder} " + f"(looked for {', '.join(SAS_EXTENSIONS)})", + file=sys.stderr, + ) + return 2 + + print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:") + for c in clusters: + print(_describe_cluster(c)) + + if args.dry_run: + print() + for c in loadable: + print(f"--- CREATE TABLE for cluster {c.tablename!r} ---") + columns = _infer_cluster_schema(c.files[0], c.include, c.exclude) + print(render_create_table(cfg.schemaname, c.tablename, columns)) + print() + return 0 + + conn = connect() + conn.autocommit = False + failures: List[Tuple[str, Exception]] = [] + totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows) + try: + for cluster in loadable: + print( + f"\n>>> loading cluster {cluster.tablename!r} " + f"({len(cluster.files)} file(s))" + ) + try: + rows = load_cluster(conn, cluster, cfg.schemaname) + conn.commit() + totals.append((cluster.tablename, len(cluster.files), rows)) + print( + f" -> loaded {rows:,} row(s) into " + f"{cfg.schemaname}.{cluster.tablename}" + ) + except Exception as e: + conn.rollback() + failures.append((cluster.tablename, e)) + print( + f" !! cluster {cluster.tablename!r} failed: {e}", + file=sys.stderr, + ) + if args.fail_fast: + break + finally: + conn.close() + + print("\n=== summary ===") + for name, fcount, rows in totals: + print(f" ok {name}: {fcount} file(s), {rows:,} row(s)") + for name, err in failures: + print(f" FAIL {name}: {err}", file=sys.stderr) + + return 1 if failures else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 78d00c4..b514917 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -854,6 +854,22 @@ def _assert_schema_compatible( ) +def assert_schema_compatible( + conn, + schema_name: str, + table_name: str, + columns: Dict[str, ColumnSpec], +) -> None: + """Public wrapper around :func:`_assert_schema_compatible`. + + Intended for orchestrators (e.g. the folder loader) that append multiple + files into one table and need to re-run the same compatibility check + that ``if_exists=append`` performs internally. Raises + :class:`SchemaCompatibilityError` on mismatch. + """ + _assert_schema_compatible(conn, schema_name, table_name, columns) + + def create_table( conn, schema_name: str, diff --git a/generic_loader/sample_folder_config.yaml b/generic_loader/sample_folder_config.yaml new file mode 100644 index 0000000..e2ddfda --- /dev/null +++ b/generic_loader/sample_folder_config.yaml @@ -0,0 +1,54 @@ +# Example folder-level loader config. +# +# Shape mirrors what `load_folder.py` expects: +# +# python load_folder.py --config sample_folder_config.yaml --dry-run +# python load_folder.py --config sample_folder_config.yaml +# +# Relative paths are resolved against this config file's directory first, +# falling back to the current working directory if that doesn't exist. + +folder: samples/folder_test +schemaname: public + +# Applied when creating the first file of each cluster. +# One of: fail | replace | append. Default: fail. +if_exists: replace + +# When true (default), any file not matched by an explicit pattern below is +# auto-grouped with its peers by stripping trailing digits (and any trailing +# _ / -) from the file stem. Files with no trailing digits become their own +# singleton cluster. +auto_detect: true + +# Folder-level column filter. Every file in every cluster passes through +# this filter. `include` and `exclude` are mutually exclusive. A cluster can +# override these via its own `include` / `exclude` keys. +# +# include: +# - ID +# - INTCOL +# exclude: +# - ALLNULL + +# Explicit cluster patterns. Each pattern is matched against the file +# *basename*. Files matched by a pattern are pulled out of the auto-detect +# pool, so explicit and auto clusters compose cleanly. +# +# `tablename` is required. `if_exists`, `include`, and `exclude` are +# optional per-cluster overrides of the folder-level defaults above. +clusters: + - pattern: '^group_a\d+\.xpt$' + tablename: group_a + + # Example of an explicit override. Uncomment to force the group_b cluster to + # append instead of replace even though the folder default is "replace": + # + # - pattern: '^group_b\d+\.xpt$' + # tablename: group_b + # if_exists: append + + # With only the gq pattern explicit, auto_detect: true will still bucket + # group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone + # standalone.xpt into a "standalone" cluster. See generate_sample_folder.py + # for the fixture that exercises exactly this layout. From 3b913b2ca6dde7e3451ebb6bfb65bf92457babef Mon Sep 17 00:00:00 2001 From: michael-corey Date: Sat, 18 Apr 2026 12:37:22 -0500 Subject: [PATCH 5/7] adding user prompt for db creds --- generic_loader/.gitignore | 3 ++- generic_loader/load_folder.py | 16 +++++++++++++++- generic_loader/load_sas.py | 29 +++++++++++++++++++++++++---- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/generic_loader/.gitignore b/generic_loader/.gitignore index 00d5b98..a54db63 100644 --- a/generic_loader/.gitignore +++ b/generic_loader/.gitignore @@ -1,4 +1,5 @@ /.venv /samples /.env -/__pycache__ \ No newline at end of file +/__pycache__ +/venv \ No newline at end of file diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index baecba9..099c419 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -95,6 +95,7 @@ Exit codes: from __future__ import annotations import argparse +import getpass import re import sys from dataclasses import dataclass, field @@ -463,6 +464,14 @@ def _build_argparser() -> argparse.ArgumentParser: "cluster back and continue with the next one." ), ) + p.add_argument( + "--dbcreds", + action="store_true", + help=( + "Prompt for database username and password instead of reading " + "PGUSER / PGPASSWORD from the environment or .env file." + ), + ) return p @@ -512,7 +521,12 @@ def main(argv: Optional[List[str]] = None) -> int: print() return 0 - conn = connect() + db_user = db_password = None + if args.dbcreds: + db_user = input("Database username: ") + db_password = getpass.getpass("Database password: ") + + conn = connect(user=db_user, password=db_password) conn.autocommit = False failures: List[Tuple[str, Exception]] = [] totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index b514917..0be9602 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -207,6 +207,7 @@ from __future__ import annotations import argparse import datetime as dt +import getpass import io import json import os @@ -308,18 +309,25 @@ class ValidationError(RuntimeError): # --------------------------------------------------------------------------- -def connect() -> psycopg2.extensions.connection: +def connect( + *, + user: Optional[str] = None, + password: Optional[str] = None, +) -> psycopg2.extensions.connection: """Open a psycopg2 connection using standard libpq env vars. Assumes `.env` has already been loaded (the CLI does this before calling). Orchestrators that wrap this module should either call ``load_dotenv()`` themselves or ensure the env vars are set. + + ``user`` and ``password`` override the corresponding env vars when supplied + (used by the ``--dbcreds`` CLI flag to accept interactive input). """ conn = psycopg2.connect( host=os.environ.get("PGHOST"), port=os.environ.get("PGPORT"), - user=os.environ.get("PGUSER"), - password=os.environ.get("PGPASSWORD"), + user=user or os.environ.get("PGUSER"), + password=password or os.environ.get("PGPASSWORD"), dbname=os.environ.get("PGDATABASE"), ) return conn @@ -1150,6 +1158,14 @@ def _build_argparser() -> argparse.ArgumentParser: action="store_true", help="Print inferred CREATE TABLE and stop; don't touch Postgres.", ) + p.add_argument( + "--dbcreds", + action="store_true", + help=( + "Prompt for database username and password instead of reading " + "PGUSER / PGPASSWORD from the environment or .env file." + ), + ) return p @@ -1208,7 +1224,12 @@ def main(argv: Optional[List[str]] = None) -> int: print(f" streaming... {seen:,} rows", file=sys.stderr) yield chunk_df - conn = connect() + db_user = db_password = None + if args.dbcreds: + db_user = input("Database username: ") + db_password = getpass.getpass("Database password: ") + + conn = connect(user=db_user, password=db_password) conn.autocommit = False try: create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists) From c1e1fec10b6372a18a2b684abfdb8aa206169c6d Mon Sep 17 00:00:00 2001 From: David Peterson Date: Sat, 18 Apr 2026 12:39:44 -0500 Subject: [PATCH 6/7] Update requirements.txt to support new package versions and add boto3 dependency --- generic_loader/requirements.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/generic_loader/requirements.txt b/generic_loader/requirements.txt index c481d42..b19dce6 100644 --- a/generic_loader/requirements.txt +++ b/generic_loader/requirements.txt @@ -1,6 +1,7 @@ -pandas>=2.0,<2.3 +pandas>=2.0,<3.0 pyreadstat>=1.2,<1.3 -numpy>=1.24,<2.1 +numpy>=2.1,<3.0 pyyaml>=6.0,<7.0 psycopg2-binary>=2.9,<3.0 python-dotenv>=1.0,<2.0 +boto3>=1.28,<2.0 From 1bbe0d4cd695e6e5779fa11636801c592d88b735 Mon Sep 17 00:00:00 2001 From: michael-corey Date: Sat, 18 Apr 2026 12:54:29 -0500 Subject: [PATCH 7/7] removing latin encoding, adding usage notes --- generic_loader/load_folder.py | 6 ++++++ generic_loader/load_sas.py | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 099c419..9301786 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -47,6 +47,7 @@ USAGE :: python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast] + [--dbcreds] Flags: --config PATH Required. Path to the YAML config above. @@ -56,6 +57,11 @@ Flags: --fail-fast Abort the whole run on the first cluster failure. Default is to log the failure, roll that cluster back, and keep going. + --dbcreds Prompt interactively for the database username and + password instead of reading ``PGUSER`` / ``PGPASSWORD`` + from the environment or ``.env`` file. The password + prompt does not echo. Has no effect with ``--dry-run`` + (no connection is opened). Exit codes: 0 - every cluster loaded successfully (or dry-run completed) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 0be9602..614c4b4 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -57,6 +57,7 @@ the vars) before calling :func:`connect`. :: python load_sas.py --config path/to/config.yaml [--validate] [--dry-run] + [--dbcreds] Flags: --config PATH Required. Path to the YAML config above. @@ -66,6 +67,11 @@ Flags: ``--dry-run``. --dry-run Print the inferred ``CREATE TABLE`` SQL and stop. The database is never touched (no connection is opened). + --dbcreds Prompt interactively for the database username and + password instead of reading ``PGUSER`` / ``PGPASSWORD`` + from the environment or ``.env`` file. The password + prompt does not echo. Has no effect with ``--dry-run`` + (no connection is opened). Exit codes: 0 - success (load completed, or dry-run/validate passed) @@ -85,6 +91,9 @@ Typical invocations:: # Actually load the data. python load_sas.py --config sample_config.yaml + # Load the data, prompting for credentials instead of using .env. + python load_sas.py --config sample_config.yaml --dbcreds + 4. Expected-types manifest (``--validate``) ------------------------------------------- ``--validate`` looks for a JSON file named ``.expected.json`` next @@ -403,7 +412,7 @@ def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]: if suffix in (".xpt", ".xport"): return pyreadstat.read_xport, {} if suffix == ".sas7bdat": - return pyreadstat.read_sas7bdat, {"encoding": "latin-1"} + return pyreadstat.read_sas7bdat, {} raise ValueError(f"Unsupported SAS file extension: {suffix}")