foxtrot/generic_loader/load_folder.py

1015 lines
37 KiB
Python
Raw Normal View History

"""Folder-level SAS-to-Postgres loader.
Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in
one invocation. A directory often contains several *clusters* of files that
share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each
cluster becomes one Postgres table; files inside a cluster are appended to it.
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
1. YAML config
--------------
::
folder: samples/folder_test # required; relative paths resolve against
# the config file's directory
schemaname: public # required
# Optional. One of: fail | replace | append. Default: fail.
# Applied to the first file of each cluster (subsequent files in the
# cluster always run through the append-mode compatibility check).
if_exists: fail
# Optional. Default: true. When true, files that don't match any explicit
# pattern below are grouped by their common prefix (trailing digits, and
# optional trailing separators, are stripped from each file stem).
auto_detect: true
# Optional. Columns to force-include or force-exclude across every file.
# include and exclude are mutually exclusive.
# include: [ID, INTCOL]
# exclude: [ALLNULL]
2026-04-20 14:56:00 +00:00
# Optional folder default for LIST partitioning. Omit or set [] for no
# partitioning. Accepts a single string or a list of column names.
# partition_by:
# - state
# - zip
# Optional folder default threshold. Default: 10000.
# max_partitions: 10000
# Optional explicit cluster patterns. Each pattern is matched against the
# file *basename*. Matched files are pulled out of the auto-detect pool.
2026-04-20 14:56:00 +00:00
# Per-cluster if_exists/include/exclude/partition_by/max_partitions
# override the folder-level defaults.
clusters:
- pattern: '^group_a\\d+\\.sas7bdat$'
tablename: group_a
- pattern: '^group_b\\d+\\.sas7bdat$'
tablename: group_b
if_exists: replace
2. Command-line interface
-------------------------
::
python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast]
[--dbcreds]
Flags:
--config PATH Required. Path to the YAML config above.
2026-04-20 14:56:00 +00:00
--dry-run Print the discovered clusters and the inferred DDL for
each (CREATE TABLE plus partition DDL when applicable).
For partitioned clusters all files are scanned to
discover partition values. The database is never touched.
--fail-fast Abort the whole run on the first cluster failure.
Default is to log the failure, roll that cluster back,
and keep going.
--dbcreds Prompt interactively for the database username and
password instead of reading ``PGUSER`` / ``PGPASSWORD``
from the environment or ``.env`` file. The password
prompt does not echo. Has no effect with ``--dry-run``
(no connection is opened).
Exit codes:
0 - every cluster loaded successfully (or dry-run completed)
1 - at least one cluster failed (details on stderr)
2 - folder does not exist / contains no SAS files
3. Discovery rules
------------------
* Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches
:mod:`load_sas`). The folder is not scanned recursively.
* Explicit patterns are tried in order. A file matched by one pattern is
removed from the pool before the next pattern runs, so earlier patterns
win in case of overlap. Overlap between patterns is flagged as an error
at config-parse time (a file matching two patterns is almost always a bug).
* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with
any trailing ``_`` / ``-`` stripped afterward. Stems without trailing
digits become singleton clusters named after the stem.
* Within a cluster, files are sorted **numerically** by the last digit
group in the stem, so ``..._9_...`` comes before ``..._10_...`` /
``..._40_...`` regardless of zero-padding. The first file in that
order drives schema inference; the rest are checked against that
schema via :func:`load_sas.assert_schema_compatible`. Gaps in the
numeric sequence (missing ``3``, ``7``, ``14``, ...) are irrelevant -
whatever files are present get loaded in numeric order.
* Auto-detect only recognizes *trailing* digit runs. File names where
the varying number sits in the middle of the stem (surrounded by
other name components) are not grouped by auto-detect - each becomes
its own singleton cluster. Use an explicit pattern to bucket them::
clusters:
- pattern: '^year2020_regionA_\\d+_detail\\.sas7bdat$'
tablename: year2020_regionA_detail
The regex still matches any digit width, so numbers like ``9`` and
``40`` both land in the same cluster and the numeric sort above puts
``9`` before ``40``.
4. Library usage
----------------
::
from load_folder import load_folder_config, discover_clusters, load_cluster
from load_sas import connect
cfg = load_folder_config("folder_config.yaml")
clusters = discover_clusters(cfg)
conn = connect()
try:
for cluster in clusters:
load_cluster(conn, cluster, cfg.schemaname)
finally:
conn.close()
"""
from __future__ import annotations
import argparse
2026-04-18 17:37:22 +00:00
import getpass
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
from dotenv import load_dotenv
from load_sas import (
VALID_IF_EXISTS,
2026-04-20 14:56:00 +00:00
_count_partitions,
_merge_partition_trees,
apply_column_filter,
assert_schema_compatible,
connect,
copy_dataframes,
2026-04-20 15:18:09 +00:00
create_indexes,
create_table,
2026-04-20 14:56:00 +00:00
discover_partition_values_chunked,
infer_schema,
iter_sas_chunks,
read_sas_preview,
2026-04-20 15:18:09 +00:00
render_create_indexes,
render_create_table,
2026-04-20 14:56:00 +00:00
render_partition_ddl,
)
SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport")
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class ClusterSpec:
2026-04-20 14:56:00 +00:00
"""Resolved per-cluster load settings.
2026-04-20 15:18:09 +00:00
``partition_by``, ``max_partitions``, and ``indexes`` are resolved from
the folder defaults and any per-cluster overrides during
:func:`discover_clusters`.
2026-04-20 14:56:00 +00:00
"""
tablename: str
files: List[Path]
if_exists: str
include: Optional[List[str]]
exclude: Optional[List[str]]
source: str # "explicit" or "auto"
pattern: Optional[str] = None
2026-04-20 14:56:00 +00:00
partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000
2026-04-20 15:18:09 +00:00
indexes: List[str] = field(default_factory=list)
@dataclass
class _ExplicitPattern:
2026-04-20 14:56:00 +00:00
"""Parsed form of a single ``clusters[*]`` YAML entry.
``partition_by`` defaults to ``None`` meaning "inherit from folder level".
An explicit empty list ``[]`` means "disable partitioning for this cluster".
``max_partitions`` defaults to ``None`` meaning "inherit from folder level".
2026-04-20 15:18:09 +00:00
``indexes`` defaults to ``None`` meaning "inherit from folder level".
2026-04-20 14:56:00 +00:00
"""
pattern: re.Pattern
raw_pattern: str
tablename: str
if_exists: Optional[str] = None
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
2026-04-20 14:56:00 +00:00
partition_by: Optional[List[str]] = None
max_partitions: Optional[int] = None
2026-04-20 15:18:09 +00:00
indexes: Optional[List[str]] = None
@dataclass
class FolderConfig:
2026-04-20 14:56:00 +00:00
"""Folder-level configuration parsed from YAML.
2026-04-20 15:18:09 +00:00
``partition_by``, ``max_partitions``, and ``indexes`` serve as defaults
for every cluster unless overridden at the cluster level.
2026-04-20 14:56:00 +00:00
"""
folder: Path
schemaname: str
if_exists: str = "fail"
auto_detect: bool = True
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
explicit: List[_ExplicitPattern] = field(default_factory=list)
2026-04-20 14:56:00 +00:00
partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000
2026-04-20 15:18:09 +00:00
indexes: List[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Config loading
# ---------------------------------------------------------------------------
def _validate_if_exists(value: Any, where: str) -> str:
s = str(value).lower()
if s not in VALID_IF_EXISTS:
raise ValueError(
f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}"
)
return s
def _parse_columns_filter(
raw: Dict[str, Any], where: str
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
include = raw.get("include")
exclude = raw.get("exclude")
if include is not None and exclude is not None:
raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.")
if include is not None and not isinstance(include, list):
raise ValueError(f"{where}: 'include' must be a list of column names.")
if exclude is not None and not isinstance(exclude, list):
raise ValueError(f"{where}: 'exclude' must be a list of column names.")
include_out = [str(c) for c in include] if include is not None else None
exclude_out = [str(c) for c in exclude] if exclude is not None else None
return include_out, exclude_out
2026-04-20 14:56:00 +00:00
def _parse_partition_by(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[List[str]]:
"""Parse a ``partition_by`` value from YAML.
Returns a list of non-empty, unique column name strings. When
``allow_none`` is True (used for per-cluster entries), an omitted key
returns ``None`` to signal "inherit from folder level". An explicit
empty list ``[]`` always returns ``[]``.
"""
if raw_value is None:
return None if allow_none else []
if isinstance(raw_value, str):
if not raw_value.strip():
raise ValueError(f"{where}: 'partition_by' string must be non-empty.")
return [raw_value.strip()]
if isinstance(raw_value, list):
if len(raw_value) == 0:
return []
result: List[str] = []
for i, item in enumerate(raw_value):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'partition_by[{i}]' must be a non-empty string."
)
result.append(str(item).strip())
if len(result) != len(set(result)):
raise ValueError(
f"{where}: 'partition_by' contains duplicate column names."
)
return result
raise ValueError(
f"{where}: 'partition_by' must be a string or list of strings."
)
def _parse_max_partitions(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[int]:
"""Parse a ``max_partitions`` value from YAML.
Returns a positive integer. When ``allow_none`` is True (used for
per-cluster entries), an omitted key returns ``None`` to signal
"inherit from folder level".
"""
if raw_value is None:
return None if allow_none else 10_000
try:
value = int(raw_value)
except (TypeError, ValueError):
raise ValueError(
f"{where}: 'max_partitions' must be a positive integer, "
f"got {raw_value!r}"
)
if value <= 0:
raise ValueError(
f"{where}: 'max_partitions' must be a positive integer, "
f"got {value}"
)
return value
def _validate_partition_vs_columns(
partition_by: List[str],
exclude: Optional[List[str]],
where: str,
) -> None:
"""Raise if any ``partition_by`` column is in the ``exclude`` list."""
if not partition_by or exclude is None:
return
excluded_parts = [c for c in partition_by if c in exclude]
if excluded_parts:
raise ValueError(
f"{where}: 'exclude' removes partition_by columns: {excluded_parts}"
)
2026-04-20 15:18:09 +00:00
def _parse_indexes(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[List[str]]:
"""Parse an ``indexes`` value from YAML.
Returns a list of non-empty, unique column name strings. When
``allow_none`` is True (used for per-cluster entries), an omitted key
returns ``None`` to signal "inherit from folder level". An explicit
empty list ``[]`` always returns ``[]``.
"""
if raw_value is None:
return None if allow_none else []
if isinstance(raw_value, str):
if not raw_value.strip():
raise ValueError(f"{where}: 'indexes' string must be non-empty.")
return [raw_value.strip()]
if isinstance(raw_value, list):
if len(raw_value) == 0:
return []
result: List[str] = []
for i, item in enumerate(raw_value):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'indexes[{i}]' must be a non-empty string."
)
result.append(str(item).strip())
if len(result) != len(set(result)):
raise ValueError(
f"{where}: 'indexes' contains duplicate column names."
)
return result
raise ValueError(
f"{where}: 'indexes' must be a string or list of strings."
)
def _validate_indexes_vs_columns(
indexes: List[str],
exclude: Optional[List[str]],
where: str,
) -> None:
"""Raise if any ``indexes`` column is in the ``exclude`` list."""
if not indexes or exclude is None:
return
excluded_idx = [c for c in indexes if c in exclude]
if excluded_idx:
raise ValueError(
f"{where}: 'exclude' removes index columns: {excluded_idx}"
)
def load_folder_config(path: Path) -> FolderConfig:
2026-04-20 14:56:00 +00:00
"""Parse and validate the folder-level YAML config at ``path``.
Supports optional ``partition_by`` and ``max_partitions`` at both the
folder level (defaults for all clusters) and per explicit cluster entry
(overrides the folder default).
"""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(f"Config at {path} must be a YAML mapping at the top level.")
missing = [k for k in ("folder", "schemaname") if k not in raw]
if missing:
raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}")
folder = Path(raw["folder"])
if not folder.is_absolute():
candidate = (path.parent / folder).resolve()
folder = candidate if candidate.exists() else folder
schemaname = str(raw["schemaname"])
if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}")
auto_detect = bool(raw.get("auto_detect", True))
include, exclude = _parse_columns_filter(raw, f"Config {path}")
2026-04-20 14:56:00 +00:00
# -- folder-level partition settings ------------------------------------
partition_by = _parse_partition_by(
raw.get("partition_by"), f"Config {path}"
)
max_partitions = _parse_max_partitions(
raw.get("max_partitions"), f"Config {path}"
)
_validate_partition_vs_columns(partition_by, exclude, f"Config {path}")
2026-04-20 15:18:09 +00:00
# -- folder-level index settings ----------------------------------------
indexes = _parse_indexes(raw.get("indexes"), f"Config {path}")
_validate_indexes_vs_columns(indexes, exclude, f"Config {path}")
explicit: List[_ExplicitPattern] = []
clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list):
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
for i, entry in enumerate(clusters_raw):
where = f"Config {path} clusters[{i}]"
if not isinstance(entry, dict):
raise ValueError(f"{where} must be a mapping.")
if "pattern" not in entry or "tablename" not in entry:
raise ValueError(f"{where} must include 'pattern' and 'tablename'.")
raw_pat = str(entry["pattern"])
try:
compiled = re.compile(raw_pat)
except re.error as e:
raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e
c_if_exists = (
_validate_if_exists(entry["if_exists"], where)
if "if_exists" in entry
else None
)
c_include, c_exclude = _parse_columns_filter(entry, where)
2026-04-20 14:56:00 +00:00
# -- per-cluster partition settings ---------------------------------
c_partition_by = _parse_partition_by(
entry.get("partition_by"), where, allow_none=True
)
c_max_partitions = _parse_max_partitions(
entry.get("max_partitions"), where, allow_none=True
)
# Validate partition_by vs the effective exclude for this cluster.
effective_exclude = c_exclude if c_exclude is not None else exclude
effective_pb = c_partition_by if c_partition_by is not None else partition_by
_validate_partition_vs_columns(effective_pb, effective_exclude, where)
2026-04-20 15:18:09 +00:00
# -- per-cluster index settings -------------------------------------
c_indexes = _parse_indexes(
entry.get("indexes"), where, allow_none=True
)
effective_idx = c_indexes if c_indexes is not None else indexes
_validate_indexes_vs_columns(effective_idx, effective_exclude, where)
explicit.append(
_ExplicitPattern(
pattern=compiled,
raw_pattern=raw_pat,
tablename=str(entry["tablename"]),
if_exists=c_if_exists,
include=c_include,
exclude=c_exclude,
2026-04-20 14:56:00 +00:00
partition_by=c_partition_by,
max_partitions=c_max_partitions,
2026-04-20 15:18:09 +00:00
indexes=c_indexes,
)
)
return FolderConfig(
folder=folder,
schemaname=schemaname,
if_exists=if_exists,
auto_detect=auto_detect,
include=include,
exclude=exclude,
explicit=explicit,
2026-04-20 14:56:00 +00:00
partition_by=partition_by,
max_partitions=max_partitions,
2026-04-20 15:18:09 +00:00
indexes=indexes,
)
# ---------------------------------------------------------------------------
# Cluster discovery
# ---------------------------------------------------------------------------
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
_DIGIT_GROUP_RE = re.compile(r"\d+")
def _auto_prefix(stem: str) -> str:
"""Derive the cluster key for a file stem.
Strip trailing digits and any trailing separators so
``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same
``group_a`` bucket. If nothing is stripped, the stem is its own key.
"""
stripped = _TRAILING_DIGIT_RE.sub("", stem)
stripped = stripped.rstrip("_-")
return stripped or stem
def _cluster_sort_key(path: Path) -> Tuple[int, str]:
"""Sort key for ordering files within a cluster.
Sorts numerically by the LAST digit group in the stem so ``_9`` comes
before ``_10`` / ``_40`` regardless of width, and so a file named
``foo_9_detail`` lands before ``foo_40_detail``. The first file under
this order is the one whose schema is inferred and used to create the
target table; sorting numerically keeps that choice stable as the file
set grows. Files with no digits fall to ``-1`` so they sort before
numbered files; the stem is a tiebreaker for reproducibility.
"""
digits = _DIGIT_GROUP_RE.findall(path.stem)
n = int(digits[-1]) if digits else -1
return (n, path.stem)
def _list_sas_files(folder: Path) -> List[Path]:
files: List[Path] = []
for p in sorted(folder.iterdir()):
if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS:
files.append(p)
return files
def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
"""Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects.
Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No
SAS file is opened here. Explicit patterns are applied first, in config
order; files matched by an earlier pattern are removed from the pool
before the next pattern runs. A file matching two patterns triggers a
hard error (that's almost always a config bug).
2026-04-20 14:56:00 +00:00
Partition settings are resolved per cluster:
* For explicit clusters, ``partition_by`` / ``max_partitions`` from the
cluster entry override the folder defaults when present. ``None``
means "inherit"; an explicit ``[]`` disables partitioning.
* For auto-detected clusters, folder defaults are inherited directly.
"""
if not cfg.folder.exists() or not cfg.folder.is_dir():
raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}")
pool = _list_sas_files(cfg.folder)
clusters: List[ClusterSpec] = []
# Detect cross-pattern overlap up front for a clearer error message.
for i, p_i in enumerate(cfg.explicit):
for j in range(i + 1, len(cfg.explicit)):
p_j = cfg.explicit[j]
for f in pool:
if p_i.pattern.search(f.name) and p_j.pattern.search(f.name):
raise ValueError(
f"File {f.name!r} matches multiple explicit patterns: "
f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}"
)
remaining = list(pool)
for patt in cfg.explicit:
2026-04-20 14:56:00 +00:00
# Resolve partition_by: None = inherit folder, [] = disable, list = override
resolved_pb = (
patt.partition_by if patt.partition_by is not None
else cfg.partition_by
)
resolved_mp = (
patt.max_partitions if patt.max_partitions is not None
else cfg.max_partitions
)
2026-04-20 15:18:09 +00:00
# Resolve indexes: None = inherit folder, [] = disable, list = override
resolved_idx = (
patt.indexes if patt.indexes is not None
else cfg.indexes
)
2026-04-20 14:56:00 +00:00
matched = [f for f in remaining if patt.pattern.search(f.name)]
if not matched:
# Not an error - the folder might legitimately not contain files
# for this pattern on a given run. Emit a note for the CLI.
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=[],
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
2026-04-20 14:56:00 +00:00
partition_by=resolved_pb,
max_partitions=resolved_mp,
2026-04-20 15:18:09 +00:00
indexes=resolved_idx,
)
)
continue
remaining = [f for f in remaining if f not in matched]
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=sorted(matched, key=_cluster_sort_key),
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
2026-04-20 14:56:00 +00:00
partition_by=resolved_pb,
max_partitions=resolved_mp,
2026-04-20 15:18:09 +00:00
indexes=resolved_idx,
)
)
if cfg.auto_detect and remaining:
buckets: Dict[str, List[Path]] = {}
for f in remaining:
key = _auto_prefix(f.stem)
buckets.setdefault(key, []).append(f)
for key in sorted(buckets):
clusters.append(
ClusterSpec(
tablename=key,
files=sorted(buckets[key], key=_cluster_sort_key),
if_exists=cfg.if_exists,
include=cfg.include,
exclude=cfg.exclude,
source="auto",
2026-04-20 14:56:00 +00:00
partition_by=cfg.partition_by,
max_partitions=cfg.max_partitions,
2026-04-20 15:18:09 +00:00
indexes=cfg.indexes,
)
)
return clusters
# ---------------------------------------------------------------------------
# Per-cluster load
# ---------------------------------------------------------------------------
def _infer_cluster_schema(path: Path, include, exclude):
2026-04-20 14:56:00 +00:00
"""Infer the Postgres column schema from a SAS file preview."""
preview_df, meta = read_sas_preview(path)
preview_df = apply_column_filter(preview_df, include, exclude)
total_rows = getattr(meta, "number_rows", None)
columns = infer_schema(preview_df, meta, total_rows=total_rows)
return columns
2026-04-20 14:56:00 +00:00
def _discover_cluster_partitions(
cluster: ClusterSpec,
columns: Dict,
) -> dict:
"""Scan ALL files in ``cluster`` to discover partition values.
Returns a nested partition-value tree suitable for passing to
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
Each file is scanned chunk-by-chunk so the full dataset is never
materialized in memory.
"""
merged: dict = {}
for path in cluster.files:
def _filtered_chunks(p=path):
for chunk_df, _chunk_meta in iter_sas_chunks(p):
yield apply_column_filter(
chunk_df, cluster.include, cluster.exclude
)
file_tree = discover_partition_values_chunked(
_filtered_chunks(), cluster.partition_by, columns,
)
_merge_partition_trees(merged, file_tree)
return merged
def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int:
"""Load every file in ``cluster`` into one table. Returns total rows loaded.
2026-04-20 14:56:00 +00:00
When ``cluster.partition_by`` is non-empty, partition values are
discovered across ALL files before table creation so the full partition
tree exists before any data is copied.
2026-04-20 13:38:38 +00:00
Commits happen per chunk inside :func:`load_sas.copy_dataframes`. If a
file mid-cluster fails, earlier chunks - including chunks from earlier
files in the cluster - stay committed; only the in-flight chunk is
rolled back by :func:`main`.
"""
if not cluster.files:
return 0
first, *rest = cluster.files
first_columns = _infer_cluster_schema(first, cluster.include, cluster.exclude)
2026-04-20 14:56:00 +00:00
2026-04-20 15:18:09 +00:00
# -- Validate index columns early ---------------------------------------
if cluster.indexes:
missing_icols = [
c for c in cluster.indexes if c not in first_columns
]
if missing_icols:
raise ValueError(
f"cluster {cluster.tablename!r}: indexes references "
f"columns not present in the inferred schema: {missing_icols}"
)
2026-04-20 14:56:00 +00:00
# -- Partition support --------------------------------------------------
partition_values: Optional[dict] = None
if cluster.partition_by:
# Validate that all partition_by columns exist in the inferred schema.
missing_pcols = [
c for c in cluster.partition_by if c not in first_columns
]
if missing_pcols:
raise ValueError(
f"cluster {cluster.tablename!r}: partition_by references "
f"columns not present in the inferred schema: {missing_pcols}"
)
# Discover partition values across ALL files in the cluster.
# In append mode the partitions already exist, so skip the scan.
if cluster.if_exists == "append":
print(
" [info] append mode: skipping partition discovery "
"(partitions assumed to exist)",
file=sys.stderr,
)
else:
print(
f" discovering partition values across "
f"{len(cluster.files)} file(s)...",
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
cluster, first_columns,
)
total_parts = _count_partitions(partition_values)
print(
f" discovered {total_parts:,} partition table(s) "
f"across {len(cluster.partition_by)} level(s)",
file=sys.stderr,
)
create_table(
2026-04-20 14:56:00 +00:00
conn, schemaname, cluster.tablename, first_columns, cluster.if_exists,
partition_by=cluster.partition_by or None,
partition_values=partition_values,
max_partitions=cluster.max_partitions,
)
total = 0
total += _stream_file(
conn, schemaname, cluster.tablename, first, first_columns,
cluster.include, cluster.exclude,
)
for path in rest:
columns = _infer_cluster_schema(path, cluster.include, cluster.exclude)
# Uses the same check that if_exists=append runs. A type mismatch or
2026-04-20 13:38:38 +00:00
# missing column aborts the cluster; because chunks commit as they
# load, earlier chunks in the cluster remain in the table.
assert_schema_compatible(conn, schemaname, cluster.tablename, columns)
total += _stream_file(
conn, schemaname, cluster.tablename, path, columns,
cluster.include, cluster.exclude,
)
2026-04-20 15:18:09 +00:00
# -- Index support ------------------------------------------------------
if cluster.indexes:
create_indexes(conn, schemaname, cluster.tablename, cluster.indexes)
return total
def _stream_file(
conn,
schemaname: str,
tablename: str,
path: Path,
columns,
include,
exclude,
) -> int:
def _chunks():
seen = 0
for chunk_df, _chunk_meta in iter_sas_chunks(path):
chunk_df = apply_column_filter(chunk_df, include, exclude)
seen += len(chunk_df)
print(
f" {path.name}: streaming... {seen:,} rows",
file=sys.stderr,
)
yield chunk_df
return copy_dataframes(conn, schemaname, tablename, _chunks(), columns)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Load every SAS file in a folder into Postgres, grouping files "
"into clusters that each become one table."
),
)
p.add_argument("--config", required=True, type=Path, help="Path to YAML config")
p.add_argument(
"--dry-run",
action="store_true",
help=(
2026-04-20 14:56:00 +00:00
"Print discovered clusters and the inferred DDL for each "
"(CREATE TABLE plus partition DDL when applicable). For "
"partitioned clusters all files are scanned to discover "
"partition values. The database is never touched."
),
)
p.add_argument(
"--fail-fast",
action="store_true",
help=(
"Abort on the first cluster failure. Default is to roll that "
"cluster back and continue with the next one."
),
)
2026-04-18 17:37:22 +00:00
p.add_argument(
"--dbcreds",
action="store_true",
help=(
"Prompt for database username and password instead of reading "
"PGUSER / PGPASSWORD from the environment or .env file."
),
)
return p
def _describe_cluster(cluster: ClusterSpec) -> str:
src = f"{cluster.source}"
if cluster.pattern:
src += f" pattern={cluster.pattern!r}"
files = ", ".join(f.name for f in cluster.files) or "(no matching files)"
2026-04-20 14:56:00 +00:00
parts = ""
if cluster.partition_by:
parts = f"\n partition_by: {cluster.partition_by}"
2026-04-20 15:18:09 +00:00
idx = ""
if cluster.indexes:
idx = f"\n indexes: {cluster.indexes}"
return (
f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n"
2026-04-20 15:18:09 +00:00
f" files: {files}{parts}{idx}"
)
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
load_dotenv()
cfg = load_folder_config(args.config)
if not cfg.folder.exists() or not cfg.folder.is_dir():
print(f"error: folder not found: {cfg.folder}", file=sys.stderr)
return 2
clusters = discover_clusters(cfg)
loadable = [c for c in clusters if c.files]
if not loadable:
print(
f"error: no SAS files found in {cfg.folder} "
f"(looked for {', '.join(SAS_EXTENSIONS)})",
file=sys.stderr,
)
return 2
print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:")
for c in clusters:
print(_describe_cluster(c))
if args.dry_run:
print()
for c in loadable:
2026-04-20 14:56:00 +00:00
print(f"--- DDL for cluster {c.tablename!r} ---")
columns = _infer_cluster_schema(c.files[0], c.include, c.exclude)
2026-04-20 14:56:00 +00:00
# Print parent CREATE TABLE (with PARTITION BY if applicable).
print(
render_create_table(
cfg.schemaname, c.tablename, columns,
partition_by=c.partition_by or None,
)
)
# Print child partition DDL when the cluster is partitioned.
if c.partition_by:
# Validate partition columns exist in the schema.
missing_pcols = [
col for col in c.partition_by if col not in columns
]
if missing_pcols:
print(
f" [error] partition_by references columns not in "
f"schema: {missing_pcols}",
file=sys.stderr,
)
else:
print(
f" discovering partition values across "
f"{len(c.files)} file(s)...",
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
c, columns,
)
total_parts = _count_partitions(partition_values)
print(
f" discovered {total_parts:,} partition table(s) "
f"across {len(c.partition_by)} level(s)",
file=sys.stderr,
)
child_stmts = render_partition_ddl(
cfg.schemaname, c.tablename, c.partition_by,
partition_values, columns,
max_partitions=c.max_partitions,
)
for stmt in child_stmts:
print()
print(stmt)
2026-04-20 15:18:09 +00:00
# Print CREATE INDEX DDL when the cluster has indexes.
if c.indexes:
missing_icols = [
col for col in c.indexes if col not in columns
]
if missing_icols:
print(
f" [error] indexes references columns not in "
f"schema: {missing_icols}",
file=sys.stderr,
)
else:
idx_stmts = render_create_indexes(
cfg.schemaname, c.tablename, c.indexes,
)
for stmt in idx_stmts:
print()
print(stmt)
print()
return 0
2026-04-18 17:37:22 +00:00
db_user = db_password = None
if args.dbcreds:
db_user = input("Database username: ")
db_password = getpass.getpass("Database password: ")
conn = connect(user=db_user, password=db_password)
conn.autocommit = False
failures: List[Tuple[str, Exception]] = []
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
try:
for cluster in loadable:
print(
f"\n>>> loading cluster {cluster.tablename!r} "
f"({len(cluster.files)} file(s))"
)
try:
rows = load_cluster(conn, cluster, cfg.schemaname)
conn.commit()
totals.append((cluster.tablename, len(cluster.files), rows))
print(
f" -> loaded {rows:,} row(s) into "
f"{cfg.schemaname}.{cluster.tablename}"
)
except Exception as e:
conn.rollback()
failures.append((cluster.tablename, e))
print(
f" !! cluster {cluster.tablename!r} failed: {e}",
file=sys.stderr,
)
if args.fail_fast:
break
finally:
conn.close()
print("\n=== summary ===")
for name, fcount, rows in totals:
print(f" ok {name}: {fcount} file(s), {rows:,} row(s)")
for name, err in failures:
print(f" FAIL {name}: {err}", file=sys.stderr)
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())