batch_folder_processing #1

Merged
dp merged 7 commits from batch_folder_processing into main 2026-04-18 18:23:52 +00:00
6 changed files with 1222 additions and 44 deletions

View File

@ -1,3 +1,5 @@
/.venv
/samples
/.env
/__pycache__
/venv

View File

@ -0,0 +1,185 @@
"""Generate a folder of clustered SAS XPORT files for testing ``load_folder``.
Produces ``samples/folder_test/`` containing three clusters:
* ``group_a{1,2,3}.xpt`` - kitchen-sink schema (every column).
* ``group_b{1,2}.xpt`` - a *different* schema (drops ``BIGINT`` and
``TIMECOL``) so a schema-compat check would catch cross-cluster
contamination if the regex were wrong.
* ``standalone.xpt`` - singleton to exercise the no-cluster / singleton
auto-detect path.
Alongside the files, writes ``sample_folder_config.yaml`` that exercises
both code paths: ``group_a*`` via an explicit regex pattern, ``group_b*``
and ``standalone`` via auto-detect.
Finally, runs :func:`load_folder.discover_clusters` against the generated
folder and asserts the grouping is what we expect. This is a pure in-process
smoke test of the clustering logic; no Postgres connection is required.
Reuses ``generate_sample_sas.build_dataframe`` so data shape / dtypes match
the single-file loader tests. ``N_ROWS`` is temporarily shrunk on the
imported module for this run's duration so repeated invocations stay fast.
"""
from __future__ import annotations
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import pyreadstat
import yaml
import generate_sample_sas as gss
from load_folder import discover_clusters, load_folder_config
FIXTURE_ROWS = 2_000
OUT_DIR = Path("samples/folder_test")
CONFIG_PATH = OUT_DIR / "folder_config.yaml"
GROUP_A_FILES = ["group_a1.xpt", "group_a2.xpt", "group_a3.xpt"]
GROUP_B_FILES = ["group_b1.xpt", "group_b2.xpt"]
STANDALONE_FILE = "standalone.xpt"
# Columns dropped from the group_b cluster so it has a genuinely different
# schema from the group_a cluster. If the regex accidentally pulled a group_b file
# into the group_a cluster (or vice versa), load_cluster's schema-compat check
# would fire on these differences.
GROUP_B_DROPPED_COLUMNS = ("BIGINT", "TIMECOL")
def _build_df(seed: int) -> pd.DataFrame:
"""Build a kitchen-sink DataFrame via the existing generator.
Temporarily shrinks ``generate_sample_sas.N_ROWS`` so each fixture file
is small enough to regenerate quickly. Restored afterward so importing
this module alongside the main generator stays side-effect free.
"""
saved = gss.N_ROWS
gss.N_ROWS = FIXTURE_ROWS
try:
rng = np.random.default_rng(seed)
return gss.build_dataframe(rng)
finally:
gss.N_ROWS = saved
def _write_xport(df: pd.DataFrame, path: Path, table_name: str) -> None:
# Only pass variable_format entries for columns that actually exist in
# this frame - write_xport errors on formats referencing missing cols.
variable_format = {
k: v for k, v in gss.VARIABLE_FORMATS.items() if k in df.columns
}
column_labels = {k: v for k, v in gss.COLUMN_LABELS.items() if k in df.columns}
pyreadstat.write_xport(
df,
str(path),
file_format_version=5,
table_name=table_name,
file_label=f"Folder-loader fixture ({path.name})",
column_labels=column_labels,
variable_format=variable_format,
)
def generate_fixtures() -> None:
OUT_DIR.mkdir(parents=True, exist_ok=True)
for i, name in enumerate(GROUP_A_FILES):
df = _build_df(seed=100 + i)
_write_xport(df, OUT_DIR / name, table_name=f"GRPA{i + 1}")
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
for i, name in enumerate(GROUP_B_FILES):
df = _build_df(seed=200 + i)
df = df.drop(columns=list(GROUP_B_DROPPED_COLUMNS))
_write_xport(df, OUT_DIR / name, table_name=f"GRPB{i + 1}")
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
df = _build_df(seed=300)
_write_xport(df, OUT_DIR / STANDALONE_FILE, table_name="STDALONE")
print(
f" wrote {OUT_DIR / STANDALONE_FILE} "
f"({len(df):,} rows, {len(df.columns)} cols)"
)
def write_config() -> None:
cfg = {
"folder": ".", # config lives inside the target folder
"schemaname": "public",
"if_exists": "replace",
"auto_detect": True,
"clusters": [
{
"pattern": r"^group_a\d+\.xpt$",
"tablename": "group_a",
},
],
}
with CONFIG_PATH.open("w", encoding="utf-8") as f:
# Top-of-file comment documents the intent of this generated config.
f.write(
"# Generated by generate_sample_folder.py. Demonstrates both\n"
"# explicit regex clustering (group_a*) and auto-detect\n"
"# (group_b* and standalone) working together.\n"
)
yaml.safe_dump(cfg, f, sort_keys=False)
print(f" wrote {CONFIG_PATH}")
def verify() -> None:
"""Smoke-test the clustering logic against the generated folder."""
cfg = load_folder_config(CONFIG_PATH)
clusters = discover_clusters(cfg)
by_name = {c.tablename: c for c in clusters}
expected_names = {"group_a", "group_b", "standalone"}
actual_names = set(by_name)
assert expected_names == actual_names, (
f"cluster set mismatch: expected {expected_names}, got {actual_names}"
)
group_a = by_name["group_a"]
assert group_a.source == "explicit", f"group_a source = {group_a.source!r}"
assert [f.name for f in group_a.files] == sorted(GROUP_A_FILES), (
f"group_a files = {[f.name for f in group_a.files]}"
)
group_b = by_name["group_b"]
assert group_b.source == "auto", f"group_b source = {group_b.source!r}"
assert [f.name for f in group_b.files] == sorted(GROUP_B_FILES), (
f"group_b files = {[f.name for f in group_b.files]}"
)
standalone = by_name["standalone"]
assert standalone.source == "auto", f"standalone source = {standalone.source!r}"
assert [f.name for f in standalone.files] == [STANDALONE_FILE], (
f"standalone files = {[f.name for f in standalone.files]}"
)
print(" clustering verified:")
for c in clusters:
files = ", ".join(f.name for f in c.files)
print(f" {c.tablename} [{c.source}]: {files}")
def main() -> int:
print(f"Writing fixture SAS files to {OUT_DIR}/")
generate_fixtures()
print(f"\nWriting folder config to {CONFIG_PATH}")
write_config()
print("\nVerifying discover_clusters() grouping...")
verify()
print("\nOK. Try:")
print(f" python load_folder.py --config {CONFIG_PATH} --dry-run")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,575 @@
"""Folder-level SAS-to-Postgres loader.
Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in
one invocation. A directory often contains several *clusters* of files that
share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each
cluster becomes one Postgres table; files inside a cluster are appended to it.
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
1. YAML config
--------------
::
folder: samples/folder_test # required; relative paths resolve against
# the config file's directory
schemaname: public # required
# Optional. One of: fail | replace | append. Default: fail.
# Applied to the first file of each cluster (subsequent files in the
# cluster always run through the append-mode compatibility check).
if_exists: fail
# Optional. Default: true. When true, files that don't match any explicit
# pattern below are grouped by their common prefix (trailing digits, and
# optional trailing separators, are stripped from each file stem).
auto_detect: true
# Optional. Columns to force-include or force-exclude across every file.
# include and exclude are mutually exclusive.
# include: [ID, INTCOL]
# exclude: [ALLNULL]
# Optional explicit cluster patterns. Each pattern is matched against the
# file *basename*. Matched files are pulled out of the auto-detect pool.
# Per-cluster if_exists/include/exclude override the folder-level defaults.
clusters:
- pattern: '^group_a\\d+\\.sas7bdat$'
tablename: group_a
- pattern: '^group_b\\d+\\.sas7bdat$'
tablename: group_b
if_exists: replace
2. Command-line interface
-------------------------
::
python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast]
[--dbcreds]
Flags:
--config PATH Required. Path to the YAML config above.
--dry-run Print the discovered clusters and the inferred CREATE
TABLE for each (schema from the first file of the
cluster). The database is never touched.
--fail-fast Abort the whole run on the first cluster failure.
Default is to log the failure, roll that cluster back,
and keep going.
--dbcreds Prompt interactively for the database username and
password instead of reading ``PGUSER`` / ``PGPASSWORD``
from the environment or ``.env`` file. The password
prompt does not echo. Has no effect with ``--dry-run``
(no connection is opened).
Exit codes:
0 - every cluster loaded successfully (or dry-run completed)
1 - at least one cluster failed (details on stderr)
2 - folder does not exist / contains no SAS files
3. Discovery rules
------------------
* Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches
:mod:`load_sas`). The folder is not scanned recursively.
* Explicit patterns are tried in order. A file matched by one pattern is
removed from the pool before the next pattern runs, so earlier patterns
win in case of overlap. Overlap between patterns is flagged as an error
at config-parse time (a file matching two patterns is almost always a bug).
* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with
any trailing ``_`` / ``-`` stripped afterward. Stems without trailing
digits become singleton clusters named after the stem.
4. Library usage
----------------
::
from load_folder import load_folder_config, discover_clusters, load_cluster
from load_sas import connect
cfg = load_folder_config("folder_config.yaml")
clusters = discover_clusters(cfg)
conn = connect()
try:
for cluster in clusters:
load_cluster(conn, cluster, cfg.schemaname)
finally:
conn.close()
"""
from __future__ import annotations
import argparse
import getpass
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
from dotenv import load_dotenv
from load_sas import (
VALID_IF_EXISTS,
apply_column_filter,
assert_schema_compatible,
connect,
copy_dataframes,
create_table,
infer_schema,
iter_sas_chunks,
read_sas_preview,
render_create_table,
)
SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport")
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class ClusterSpec:
tablename: str
files: List[Path]
if_exists: str
include: Optional[List[str]]
exclude: Optional[List[str]]
source: str # "explicit" or "auto"
pattern: Optional[str] = None
@dataclass
class _ExplicitPattern:
"""Parsed form of a single ``clusters[*]`` YAML entry."""
pattern: re.Pattern
raw_pattern: str
tablename: str
if_exists: Optional[str] = None
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
@dataclass
class FolderConfig:
folder: Path
schemaname: str
if_exists: str = "fail"
auto_detect: bool = True
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
explicit: List[_ExplicitPattern] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Config loading
# ---------------------------------------------------------------------------
def _validate_if_exists(value: Any, where: str) -> str:
s = str(value).lower()
if s not in VALID_IF_EXISTS:
raise ValueError(
f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}"
)
return s
def _parse_columns_filter(
raw: Dict[str, Any], where: str
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
include = raw.get("include")
exclude = raw.get("exclude")
if include is not None and exclude is not None:
raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.")
if include is not None and not isinstance(include, list):
raise ValueError(f"{where}: 'include' must be a list of column names.")
if exclude is not None and not isinstance(exclude, list):
raise ValueError(f"{where}: 'exclude' must be a list of column names.")
include_out = [str(c) for c in include] if include is not None else None
exclude_out = [str(c) for c in exclude] if exclude is not None else None
return include_out, exclude_out
def load_folder_config(path: Path) -> FolderConfig:
"""Parse and validate the folder-level YAML config at ``path``."""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(f"Config at {path} must be a YAML mapping at the top level.")
missing = [k for k in ("folder", "schemaname") if k not in raw]
if missing:
raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}")
folder = Path(raw["folder"])
if not folder.is_absolute():
candidate = (path.parent / folder).resolve()
folder = candidate if candidate.exists() else folder
schemaname = str(raw["schemaname"])
if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}")
auto_detect = bool(raw.get("auto_detect", True))
include, exclude = _parse_columns_filter(raw, f"Config {path}")
explicit: List[_ExplicitPattern] = []
clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list):
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
for i, entry in enumerate(clusters_raw):
where = f"Config {path} clusters[{i}]"
if not isinstance(entry, dict):
raise ValueError(f"{where} must be a mapping.")
if "pattern" not in entry or "tablename" not in entry:
raise ValueError(f"{where} must include 'pattern' and 'tablename'.")
raw_pat = str(entry["pattern"])
try:
compiled = re.compile(raw_pat)
except re.error as e:
raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e
c_if_exists = (
_validate_if_exists(entry["if_exists"], where)
if "if_exists" in entry
else None
)
c_include, c_exclude = _parse_columns_filter(entry, where)
explicit.append(
_ExplicitPattern(
pattern=compiled,
raw_pattern=raw_pat,
tablename=str(entry["tablename"]),
if_exists=c_if_exists,
include=c_include,
exclude=c_exclude,
)
)
return FolderConfig(
folder=folder,
schemaname=schemaname,
if_exists=if_exists,
auto_detect=auto_detect,
include=include,
exclude=exclude,
explicit=explicit,
)
# ---------------------------------------------------------------------------
# Cluster discovery
# ---------------------------------------------------------------------------
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
def _auto_prefix(stem: str) -> str:
"""Derive the cluster key for a file stem.
Strip trailing digits and any trailing separators so
``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same
``group_a`` bucket. If nothing is stripped, the stem is its own key.
"""
stripped = _TRAILING_DIGIT_RE.sub("", stem)
stripped = stripped.rstrip("_-")
return stripped or stem
def _list_sas_files(folder: Path) -> List[Path]:
files: List[Path] = []
for p in sorted(folder.iterdir()):
if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS:
files.append(p)
return files
def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
"""Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects.
Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No
SAS file is opened here. Explicit patterns are applied first, in config
order; files matched by an earlier pattern are removed from the pool
before the next pattern runs. A file matching two patterns triggers a
hard error (that's almost always a config bug).
"""
if not cfg.folder.exists() or not cfg.folder.is_dir():
raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}")
pool = _list_sas_files(cfg.folder)
clusters: List[ClusterSpec] = []
# Detect cross-pattern overlap up front for a clearer error message.
for i, p_i in enumerate(cfg.explicit):
for j in range(i + 1, len(cfg.explicit)):
p_j = cfg.explicit[j]
for f in pool:
if p_i.pattern.search(f.name) and p_j.pattern.search(f.name):
raise ValueError(
f"File {f.name!r} matches multiple explicit patterns: "
f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}"
)
remaining = list(pool)
for patt in cfg.explicit:
matched = [f for f in remaining if patt.pattern.search(f.name)]
if not matched:
# Not an error - the folder might legitimately not contain files
# for this pattern on a given run. Emit a note for the CLI.
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=[],
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
)
)
continue
remaining = [f for f in remaining if f not in matched]
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=sorted(matched),
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
)
)
if cfg.auto_detect and remaining:
buckets: Dict[str, List[Path]] = {}
for f in remaining:
key = _auto_prefix(f.stem)
buckets.setdefault(key, []).append(f)
for key in sorted(buckets):
clusters.append(
ClusterSpec(
tablename=key,
files=sorted(buckets[key]),
if_exists=cfg.if_exists,
include=cfg.include,
exclude=cfg.exclude,
source="auto",
)
)
return clusters
# ---------------------------------------------------------------------------
# Per-cluster load
# ---------------------------------------------------------------------------
def _infer_cluster_schema(path: Path, include, exclude):
preview_df, meta = read_sas_preview(path)
preview_df = apply_column_filter(preview_df, include, exclude)
total_rows = getattr(meta, "number_rows", None)
columns = infer_schema(preview_df, meta, total_rows=total_rows)
return columns
def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int:
"""Load every file in ``cluster`` into one table. Returns total rows loaded.
The caller owns transaction boundaries. This function does NOT commit or
roll back - :func:`main` does that per cluster so one bad cluster
doesn't poison the rest of the run.
"""
if not cluster.files:
return 0
first, *rest = cluster.files
first_columns = _infer_cluster_schema(first, cluster.include, cluster.exclude)
create_table(
conn, schemaname, cluster.tablename, first_columns, cluster.if_exists
)
total = 0
total += _stream_file(
conn, schemaname, cluster.tablename, first, first_columns,
cluster.include, cluster.exclude,
)
for path in rest:
columns = _infer_cluster_schema(path, cluster.include, cluster.exclude)
# Uses the same check that if_exists=append runs. A type mismatch or
# missing column aborts the cluster; the transaction rollback in
# main() keeps the table from ending up half-loaded.
assert_schema_compatible(conn, schemaname, cluster.tablename, columns)
total += _stream_file(
conn, schemaname, cluster.tablename, path, columns,
cluster.include, cluster.exclude,
)
return total
def _stream_file(
conn,
schemaname: str,
tablename: str,
path: Path,
columns,
include,
exclude,
) -> int:
def _chunks():
seen = 0
for chunk_df, _chunk_meta in iter_sas_chunks(path):
chunk_df = apply_column_filter(chunk_df, include, exclude)
seen += len(chunk_df)
print(
f" {path.name}: streaming... {seen:,} rows",
file=sys.stderr,
)
yield chunk_df
return copy_dataframes(conn, schemaname, tablename, _chunks(), columns)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Load every SAS file in a folder into Postgres, grouping files "
"into clusters that each become one table."
),
)
p.add_argument("--config", required=True, type=Path, help="Path to YAML config")
p.add_argument(
"--dry-run",
action="store_true",
help=(
"Print discovered clusters and the inferred CREATE TABLE for "
"each; don't touch Postgres."
),
)
p.add_argument(
"--fail-fast",
action="store_true",
help=(
"Abort on the first cluster failure. Default is to roll that "
"cluster back and continue with the next one."
),
)
p.add_argument(
"--dbcreds",
action="store_true",
help=(
"Prompt for database username and password instead of reading "
"PGUSER / PGPASSWORD from the environment or .env file."
),
)
return p
def _describe_cluster(cluster: ClusterSpec) -> str:
src = f"{cluster.source}"
if cluster.pattern:
src += f" pattern={cluster.pattern!r}"
files = ", ".join(f.name for f in cluster.files) or "(no matching files)"
return (
f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n"
f" files: {files}"
)
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
load_dotenv()
cfg = load_folder_config(args.config)
if not cfg.folder.exists() or not cfg.folder.is_dir():
print(f"error: folder not found: {cfg.folder}", file=sys.stderr)
return 2
clusters = discover_clusters(cfg)
loadable = [c for c in clusters if c.files]
if not loadable:
print(
f"error: no SAS files found in {cfg.folder} "
f"(looked for {', '.join(SAS_EXTENSIONS)})",
file=sys.stderr,
)
return 2
print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:")
for c in clusters:
print(_describe_cluster(c))
if args.dry_run:
print()
for c in loadable:
print(f"--- CREATE TABLE for cluster {c.tablename!r} ---")
columns = _infer_cluster_schema(c.files[0], c.include, c.exclude)
print(render_create_table(cfg.schemaname, c.tablename, columns))
print()
return 0
db_user = db_password = None
if args.dbcreds:
db_user = input("Database username: ")
db_password = getpass.getpass("Database password: ")
conn = connect(user=db_user, password=db_password)
conn.autocommit = False
failures: List[Tuple[str, Exception]] = []
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
try:
for cluster in loadable:
print(
f"\n>>> loading cluster {cluster.tablename!r} "
f"({len(cluster.files)} file(s))"
)
try:
rows = load_cluster(conn, cluster, cfg.schemaname)
conn.commit()
totals.append((cluster.tablename, len(cluster.files), rows))
print(
f" -> loaded {rows:,} row(s) into "
f"{cfg.schemaname}.{cluster.tablename}"
)
except Exception as e:
conn.rollback()
failures.append((cluster.tablename, e))
print(
f" !! cluster {cluster.tablename!r} failed: {e}",
file=sys.stderr,
)
if args.fail_fast:
break
finally:
conn.close()
print("\n=== summary ===")
for name, fcount, rows in totals:
print(f" ok {name}: {fcount} file(s), {rows:,} row(s)")
for name, err in failures:
print(f" FAIL {name}: {err}", file=sys.stderr)
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -8,19 +8,222 @@ Python 3.9 compatible (target is an air-gapped host that currently only has
3.9). ``from __future__ import annotations`` lets us use PEP 585 generics
as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick
to ``typing``.
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
Supported inputs:
* ``.sas7bdat`` (read with ``encoding="latin-1"``)
* ``.xpt`` / ``.xport`` (SAS transport files)
1. YAML config
--------------
Every invocation is driven by a YAML file describing one SAS file to load::
filename: samples/sample_kitchensink.xpt # required; relative paths are
# resolved against the config
# file's directory when possible
schemaname: public # required
tablename: kitchensink # required
# Optional. One of: fail | replace | append. Default: fail.
# fail - error out if the target table already exists
# replace - DROP and recreate the table from the inferred schema
# append - keep the existing table; pre-flight a schema-compat check,
# then COPY the new rows in
if_exists: append
# Optional, mutually exclusive. Restrict which columns are loaded.
# include:
# - ID
# - INTCOL
# exclude:
# - ALLNULL
2. Database connection
----------------------
The loader uses standard libpq environment variables (read via ``os.environ``)::
PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE
The CLI calls ``python-dotenv``'s ``load_dotenv()`` at startup, so a local
``.env`` file is picked up automatically. Library callers are responsible for
populating the environment themselves (either call ``load_dotenv()`` or export
the vars) before calling :func:`connect`.
3. Command-line interface
-------------------------
::
python load_sas.py --config path/to/config.yaml [--validate] [--dry-run]
[--dbcreds]
Flags:
--config PATH Required. Path to the YAML config above.
--validate Compare the inferred schema against
``<sas-file-stem>.expected.json`` sitting next to the SAS
file. Exits nonzero on mismatch. Safe to combine with
``--dry-run``.
--dry-run Print the inferred ``CREATE TABLE`` SQL and stop. The
database is never touched (no connection is opened).
--dbcreds Prompt interactively for the database username and
password instead of reading ``PGUSER`` / ``PGPASSWORD``
from the environment or ``.env`` file. The password
prompt does not echo. Has no effect with ``--dry-run``
(no connection is opened).
Exit codes:
0 - success (load completed, or dry-run/validate passed)
1 - validation failure
2 - config references a SAS file that does not exist
Other nonzero - uncaught exception (traceback printed); the transaction
is rolled back before exit.
Typical invocations::
# Preview the inferred schema without connecting to Postgres.
python load_sas.py --config sample_config.yaml --dry-run
# Check the inferred schema against an expected-types manifest.
python load_sas.py --config sample_config.yaml --validate --dry-run
# Actually load the data.
python load_sas.py --config sample_config.yaml
# Load the data, prompting for credentials instead of using .env.
python load_sas.py --config sample_config.yaml --dbcreds
4. Expected-types manifest (``--validate``)
-------------------------------------------
``--validate`` looks for a JSON file named ``<sas-stem>.expected.json`` next
to the SAS file, e.g. ``samples/sample_kitchensink.xpt`` pairs with
``samples/sample_kitchensink.expected.json``. Each top-level key is a column
name; the value is an object with any of::
{
"postgres_type": "BIGINT", # exact expected type, OR
"acceptable_types": ["TEXT", # any-of list of acceptable types
"VARCHAR"],
"nullable": true, # default true; false = must be NOT NULL
"note": "free-form comment" # ignored by the loader
}
Type comparison ignores length/precision modifiers and normalizes synonyms
(e.g. ``INT`` == ``INTEGER`` == ``INT4``; ``VARCHAR(10)`` == ``VARCHAR``).
Nullability tightening (inferred NULL, manifest NOT NULL) is a hard failure;
loosening is not checked here because the append-mode check already covers it.
5. Library usage
----------------
The CLI is a thin wrapper around composable functions. The preferred pattern
infers the schema from a bounded preview and then streams the rest of the
file chunk-by-chunk into ``COPY`` - crucial for SAS files with hundreds of
millions of rows::
from dotenv import load_dotenv
from load_sas import (
load_config, read_sas_preview, iter_sas_chunks, apply_column_filter,
infer_schema, validate_against_manifest, render_create_table,
connect, create_table, copy_dataframes,
)
load_dotenv()
cfg = load_config("config.yaml")
# Schema from a preview slice (bounded by TYPE_INFERENCE_SAMPLE_ROWS).
preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
total_rows = getattr(meta, "number_rows", None)
columns = infer_schema(preview_df, meta, total_rows=total_rows)
# Optional: preview DDL / validate against a manifest.
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
problems = validate_against_manifest(columns, Path("expected.json"))
assert not problems, problems
conn = connect()
conn.autocommit = False
try:
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
chunks = (
apply_column_filter(df, cfg.include, cfg.exclude)
for df, _ in iter_sas_chunks(cfg.filename)
)
rows = copy_dataframes(conn, cfg.schemaname, cfg.tablename, chunks, columns)
conn.commit()
finally:
conn.close()
For small files (or tests) the legacy one-shot API still works:
:func:`read_sas` returns the whole frame and :func:`copy_dataframe` copies it
in one round trip.
All functions are side-effect free except :func:`connect`, :func:`create_table`,
:func:`copy_dataframe`, and :func:`copy_dataframes`; schema inference
(:func:`infer_schema`) accepts a ``coerce_chars`` kwarg to override the
module-level ``COERCE_CHAR_COLUMNS`` without mutating global state.
6. Type inference summary
-------------------------
Priority order used by :func:`infer_schema`:
1. SAS format string (via ``meta.original_variable_types``):
``DATETIME*`` -> ``TIMESTAMP``, ``TIME*`` -> ``TIME``,
``DATE*`` / ``YYMMDD*`` / ``MMDDYY*`` / ``DDMMYY*`` / ``JULIAN*`` -> ``DATE``.
2. All-null column -> ``TEXT`` (with a note).
3. pandas datetime dtype -> ``TIMESTAMP``.
4. Object columns containing only ``datetime.date`` / ``datetime.datetime``
-> ``DATE`` or ``TIMESTAMP``.
5. Object columns of strings: if ``COERCE_CHAR_COLUMNS`` is True and at
least ``CHAR_INFERENCE_MIN_VALUES`` non-empty values parse cleanly, they
are promoted to ``INTEGER`` / ``BIGINT`` / ``DOUBLE PRECISION`` /
``DATE`` / ``TIMESTAMP``; otherwise ``TEXT``.
6. Numeric columns of whole numbers -> ``INTEGER`` (or ``BIGINT`` if any
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
``DOUBLE PRECISION``.
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
performance on large files. The CLI enforces this at read time via
:func:`read_sas_preview`, so the whole file is never materialized just to pick
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
or a column had no nulls in the preview but does later in the file, ``COPY``
will fail mid-stream and the whole transaction rolls back. Set
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
matters more than speed.
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
share one cursor and transaction so a failure mid-file rolls back the whole
load.
7. Tunables
-----------
Module-level knobs at the top of this file:
* ``COERCE_CHAR_COLUMNS`` - promote stringly-typed numerics / dates
(default True).
* ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before
char-column coercion is attempted.
* ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become
``BIGINT``.
* ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows read for type inference
(``None`` = scan the whole column).
* ``DEFAULT_CHUNK_ROWS`` - rows per streaming COPY chunk.
"""
from __future__ import annotations
import argparse
import datetime as dt
import getpass
import io
import json
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple
import pandas as pd
import psycopg2
@ -45,6 +248,18 @@ values; too small a sample is easy to mis-infer."""
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
"""INTEGER bounds; anything outside becomes BIGINT."""
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
"""Cap on rows inspected during per-column type inference. Also governs how
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
schema-inference flows. Set to ``None`` to scan every row (and read the whole
file into memory for the preview step - don't do this on multi-hundred-million
row files)."""
DEFAULT_CHUNK_ROWS = 100_000
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
fewer COPY round-trips but more peak memory per chunk; smaller values are
gentler on memory."""
VALID_IF_EXISTS = ("fail", "replace", "append")
@ -72,6 +287,12 @@ class ColumnSpec:
sas_format: Optional[str] = None
source_dtype: Optional[str] = None
notes: List[str] = field(default_factory=list)
sampled: bool = False
"""True when the type was inferred from a bounded preview rather than the
full file. A sampled spec carries the usual sampling risks: a later chunk
could contain a value that exceeds the inferred integer range, doesn't
parse as the inferred type, or is null in a column the preview showed as
non-null - all of which surface as mid-``COPY`` failures."""
# ---------------------------------------------------------------------------
@ -97,18 +318,25 @@ class ValidationError(RuntimeError):
# ---------------------------------------------------------------------------
def connect() -> psycopg2.extensions.connection:
def connect(
*,
user: Optional[str] = None,
password: Optional[str] = None,
) -> psycopg2.extensions.connection:
"""Open a psycopg2 connection using standard libpq env vars.
Assumes `.env` has already been loaded (the CLI does this before calling).
Orchestrators that wrap this module should either call ``load_dotenv()``
themselves or ensure the env vars are set.
``user`` and ``password`` override the corresponding env vars when supplied
(used by the ``--dbcreds`` CLI flag to accept interactive input).
"""
conn = psycopg2.connect(
host=os.environ.get("PGHOST"),
port=os.environ.get("PGPORT"),
user=os.environ.get("PGUSER"),
password=os.environ.get("PGPASSWORD"),
user=user or os.environ.get("PGUSER"),
password=password or os.environ.get("PGPASSWORD"),
dbname=os.environ.get("PGDATABASE"),
)
return conn
@ -171,8 +399,8 @@ def load_config(path: Path) -> LoaderConfig:
# ---------------------------------------------------------------------------
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
"""Dispatch to the right pyreadstat reader by extension.
def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]:
"""Return ``(pyreadstat_reader, extra_kwargs)`` for ``path``.
Invariants (learned the hard way while building the sample generator):
@ -180,15 +408,58 @@ def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
encoding on XPORT files it wrote itself.
* ``.sas7bdat`` - explicit ``encoding="latin-1"`` per colleague guidance.
"""
path = Path(path)
suffix = path.suffix.lower()
suffix = Path(path).suffix.lower()
if suffix in (".xpt", ".xport"):
return pyreadstat.read_xport(str(path))
return pyreadstat.read_xport, {}
if suffix == ".sas7bdat":
return pyreadstat.read_sas7bdat(str(path), encoding="latin-1")
return pyreadstat.read_sas7bdat, {}
raise ValueError(f"Unsupported SAS file extension: {suffix}")
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
"""Read an entire SAS file into memory. Only safe for small files.
Kept for backward compatibility and tests; the CLI now uses
:func:`read_sas_preview` + :func:`iter_sas_chunks` so it never materializes
the whole frame at once.
"""
reader, kwargs = _sas_reader(path)
return reader(str(Path(path)), **kwargs)
def read_sas_preview(
path: Path,
*,
rows: Optional[int] = None,
) -> Tuple[pd.DataFrame, Any]:
"""Read the first ``rows`` records from ``path`` plus its metadata.
Defaults to ``TYPE_INFERENCE_SAMPLE_ROWS`` when ``rows`` is not given.
Passing ``rows=None`` with ``TYPE_INFERENCE_SAMPLE_ROWS=None`` reads the
whole file (pyreadstat treats ``row_limit=0`` as unlimited).
"""
reader, kwargs = _sas_reader(path)
effective = rows if rows is not None else TYPE_INFERENCE_SAMPLE_ROWS
row_limit = int(effective) if effective else 0
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
def iter_sas_chunks(
path: Path,
*,
chunksize: int = DEFAULT_CHUNK_ROWS,
):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
underlying reader by extension and threads through our encoding defaults.
"""
reader, kwargs = _sas_reader(path)
yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs
)
# ---------------------------------------------------------------------------
# Column filtering
# ---------------------------------------------------------------------------
@ -368,6 +639,7 @@ def infer_schema(
meta: Any,
*,
coerce_chars: bool = COERCE_CHAR_COLUMNS,
total_rows: Optional[int] = None,
) -> Dict[str, ColumnSpec]:
"""Infer a Postgres column spec for each column in ``df``.
@ -379,9 +651,25 @@ def infer_schema(
``COERCE_CHAR_COLUMNS`` without mutating global state. Internally the
char-inference helpers still read the constant - a full override would
thread the flag through, but the one-knob story here is intentional.
``total_rows`` lets callers who already sampled the frame (e.g. via
:func:`read_sas_preview`) report the real file size in the per-column
"inferred from first N of M rows" note. Falls back to ``len(df)``.
"""
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
# Row-walking type probes run on a bounded head slice; nullability and the
# all-null check still see every row so NOT NULL declarations stay honest.
df_rows = len(df)
effective_total = total_rows if total_rows is not None else df_rows
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS)
sample_size = TYPE_INFERENCE_SAMPLE_ROWS
else:
sample_df = df
sample_size = df_rows
sampled = sample_size < effective_total
# Temporarily flip the module-level flag if the caller asked us to.
global COERCE_CHAR_COLUMNS
saved = COERCE_CHAR_COLUMNS
@ -390,6 +678,7 @@ def infer_schema(
out: Dict[str, ColumnSpec] = {}
for col in df.columns:
series = df[col]
sample_series = sample_df[col]
sas_format = original_formats.get(col)
notes: List[str] = []
@ -402,13 +691,13 @@ def infer_schema(
elif pd.api.types.is_datetime64_any_dtype(series):
pg_type = "TIMESTAMP"
elif pd.api.types.is_object_dtype(series):
is_dates, any_dt = _object_is_dates(series)
is_dates, any_dt = _object_is_dates(sample_series)
if is_dates:
pg_type = "TIMESTAMP" if any_dt else "DATE"
else:
pg_type = _infer_char_type(series)
pg_type = _infer_char_type(sample_series)
elif pd.api.types.is_numeric_dtype(series):
int_target = _numeric_int_target(series)
int_target = _numeric_int_target(sample_series)
if int_target is not None:
pg_type = int_target
else:
@ -417,6 +706,12 @@ def infer_schema(
pg_type = "TEXT"
notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT")
if sampled:
notes.append(
f"type inferred from first {sample_size:,} of "
f"{effective_total:,} rows"
)
nullable = _is_nullable(series)
out[col] = ColumnSpec(
@ -426,6 +721,7 @@ def infer_schema(
sas_format=sas_format,
source_dtype=str(series.dtype),
notes=notes,
sampled=sampled,
)
return out
finally:
@ -575,6 +871,22 @@ def _assert_schema_compatible(
)
def assert_schema_compatible(
conn,
schema_name: str,
table_name: str,
columns: Dict[str, ColumnSpec],
) -> None:
"""Public wrapper around :func:`_assert_schema_compatible`.
Intended for orchestrators (e.g. the folder loader) that append multiple
files into one table and need to re-run the same compatibility check
that ``if_exists=append`` performs internally. Raises
:class:`SchemaCompatibilityError` on mismatch.
"""
_assert_schema_compatible(conn, schema_name, table_name, columns)
def create_table(
conn,
schema_name: str,
@ -713,19 +1025,31 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
return out
def copy_dataframe(
def copy_dataframes(
conn,
schema_name: str,
table_name: str,
df: pd.DataFrame,
dfs: Iterable[pd.DataFrame],
columns: Dict[str, ColumnSpec],
) -> int:
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
"""Stream an iterable of DataFrames into one ``COPY`` session.
Returns the number of rows inserted.
All chunks share a cursor and transaction, so a failure mid-stream
rolls back the whole load when the caller hasn't committed yet.
Empty chunks are skipped. Returns the total rows inserted.
"""
prepared = _prepare_for_copy(df, columns)
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
sql = (
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
f"FROM STDIN WITH (FORMAT csv, NULL '')"
)
total = 0
with conn.cursor() as cur:
for df in dfs:
if df.empty:
continue
prepared = _prepare_for_copy(df, columns)
buf = io.StringIO()
prepared.to_csv(
buf,
@ -735,18 +1059,24 @@ def copy_dataframe(
date_format="%Y-%m-%d %H:%M:%S",
)
buf.seek(0)
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
sql = (
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
f"FROM STDIN WITH (FORMAT csv, NULL '')"
)
with conn.cursor() as cur:
cur.copy_expert(sql, buf)
rowcount = cur.rowcount
total += len(prepared)
return total
return int(rowcount) if rowcount is not None else len(prepared)
def copy_dataframe(
conn,
schema_name: str,
table_name: str,
df: pd.DataFrame,
columns: Dict[str, ColumnSpec],
) -> int:
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
Convenience wrapper around :func:`copy_dataframes` for single-frame
callers. Returns the number of rows inserted.
"""
return copy_dataframes(conn, schema_name, table_name, [df], columns)
# ---------------------------------------------------------------------------
@ -837,6 +1167,14 @@ def _build_argparser() -> argparse.ArgumentParser:
action="store_true",
help="Print inferred CREATE TABLE and stop; don't touch Postgres.",
)
p.add_argument(
"--dbcreds",
action="store_true",
help=(
"Prompt for database username and password instead of reading "
"PGUSER / PGPASSWORD from the environment or .env file."
),
)
return p
@ -859,9 +1197,13 @@ def main(argv: Optional[List[str]] = None) -> int:
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
return 2
df, meta = read_sas(cfg.filename)
df = apply_column_filter(df, cfg.include, cfg.exclude)
columns = infer_schema(df, meta)
# Schema inference uses a bounded preview read so we never load a
# hundreds-of-millions-of-rows file into memory just to pick types.
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
# returned, not the file's total, so we don't trust it here.
preview_df, meta = read_sas_preview(cfg.filename)
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
columns = infer_schema(preview_df, meta)
if args.validate:
manifest_path = cfg.filename.with_suffix("").with_suffix(".expected.json")
@ -879,11 +1221,30 @@ def main(argv: Optional[List[str]] = None) -> int:
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
return 0
conn = connect()
# Release the preview frame before opening the stream - lets the GC reclaim
# it while we're holding a Postgres transaction open.
del preview_df
def _filtered_chunks():
seen = 0
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
seen += len(chunk_df)
print(f" streaming... {seen:,} rows", file=sys.stderr)
yield chunk_df
db_user = db_password = None
if args.dbcreds:
db_user = input("Database username: ")
db_password = getpass.getpass("Database password: ")
conn = connect(user=db_user, password=db_password)
conn.autocommit = False
try:
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
inserted = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns)
inserted = copy_dataframes(
conn, cfg.schemaname, cfg.tablename, _filtered_chunks(), columns
)
conn.commit()
except Exception:
conn.rollback()

View File

@ -1,6 +1,7 @@
pandas>=2.0,<2.3
pandas>=2.0,<3.0
pyreadstat>=1.2,<1.3
numpy>=1.24,<2.1
numpy>=2.1,<3.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0

View File

@ -0,0 +1,54 @@
# Example folder-level loader config.
#
# Shape mirrors what `load_folder.py` expects:
#
# python load_folder.py --config sample_folder_config.yaml --dry-run
# python load_folder.py --config sample_folder_config.yaml
#
# Relative paths are resolved against this config file's directory first,
# falling back to the current working directory if that doesn't exist.
folder: samples/folder_test
schemaname: public
# Applied when creating the first file of each cluster.
# One of: fail | replace | append. Default: fail.
if_exists: replace
# When true (default), any file not matched by an explicit pattern below is
# auto-grouped with its peers by stripping trailing digits (and any trailing
# _ / -) from the file stem. Files with no trailing digits become their own
# singleton cluster.
auto_detect: true
# Folder-level column filter. Every file in every cluster passes through
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
# override these via its own `include` / `exclude` keys.
#
# include:
# - ID
# - INTCOL
# exclude:
# - ALLNULL
# Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly.
#
# `tablename` is required. `if_exists`, `include`, and `exclude` are
# optional per-cluster overrides of the folder-level defaults above.
clusters:
- pattern: '^group_a\d+\.xpt$'
tablename: group_a
# Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace":
#
# - pattern: '^group_b\d+\.xpt$'
# tablename: group_b
# if_exists: append
# With only the gq pattern explicit, auto_detect: true will still bucket
# group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
# for the fixture that exercises exactly this layout.