"""Folder-level SAS-to-Postgres loader. Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in one invocation. A directory often contains several *clusters* of files that share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each cluster becomes one Postgres table; files inside a cluster are appended to it. ------------------------------------------------------------------------------- USAGE ------------------------------------------------------------------------------- 1. YAML config -------------- :: folder: samples/folder_test # required; relative paths resolve against # the config file's directory schemaname: public # required # Optional. One of: fail | replace | append. Default: fail. # Applied to the first file of each cluster (subsequent files in the # cluster always run through the append-mode compatibility check). if_exists: fail # Optional. Default: true. When true, files that don't match any explicit # pattern below are grouped by their common prefix (trailing digits, and # optional trailing separators, are stripped from each file stem). auto_detect: true # Optional. Columns to force-include or force-exclude across every file. # include and exclude are mutually exclusive. # include: [ID, INTCOL] # exclude: [ALLNULL] # Optional folder default for LIST partitioning. Omit or set [] for no # partitioning. Accepts a single string or a list of column names. # partition_by: # - state # - zip # Optional folder default threshold. Default: 10000. # max_partitions: 10000 # Optional explicit cluster patterns. Each pattern is matched against the # file *basename*. Matched files are pulled out of the auto-detect pool. # Per-cluster if_exists/include/exclude/partition_by/max_partitions # override the folder-level defaults. clusters: - pattern: '^group_a\\d+\\.sas7bdat$' tablename: group_a - pattern: '^group_b\\d+\\.sas7bdat$' tablename: group_b if_exists: replace 2. Command-line interface ------------------------- :: python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast] [--dbcreds] Flags: --config PATH Required. Path to the YAML config above. --dry-run Print the discovered clusters and the inferred DDL for each (CREATE TABLE plus partition DDL when applicable). For partitioned clusters all files are scanned to discover partition values. The database is never touched. --fail-fast Abort the whole run on the first cluster failure. Default is to log the failure, roll that cluster back, and keep going. --dbcreds Prompt interactively for the database username and password instead of reading ``PGUSER`` / ``PGPASSWORD`` from the environment or ``.env`` file. The password prompt does not echo. Has no effect with ``--dry-run`` (no connection is opened). Exit codes: 0 - every cluster loaded successfully (or dry-run completed) 1 - at least one cluster failed (details on stderr) 2 - folder does not exist / contains no SAS files 3. Discovery rules ------------------ * Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches :mod:`load_sas`). The folder is not scanned recursively. * Explicit patterns are tried in order. A file matched by one pattern is removed from the pool before the next pattern runs, so earlier patterns win in case of overlap. Overlap between patterns is flagged as an error at config-parse time (a file matching two patterns is almost always a bug). * Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with any trailing ``_`` / ``-`` stripped afterward. Stems without trailing digits become singleton clusters named after the stem. 4. Library usage ---------------- :: from load_folder import load_folder_config, discover_clusters, load_cluster from load_sas import connect cfg = load_folder_config("folder_config.yaml") clusters = discover_clusters(cfg) conn = connect() try: for cluster in clusters: load_cluster(conn, cluster, cfg.schemaname) finally: conn.close() """ from __future__ import annotations import argparse import getpass import re import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml from dotenv import load_dotenv from load_sas import ( VALID_IF_EXISTS, _count_partitions, _merge_partition_trees, apply_column_filter, assert_schema_compatible, connect, copy_dataframes, create_table, discover_partition_values_chunked, infer_schema, iter_sas_chunks, read_sas_preview, render_create_table, render_partition_ddl, ) SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport") # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class ClusterSpec: """Resolved per-cluster load settings. ``partition_by`` and ``max_partitions`` are resolved from the folder defaults and any per-cluster overrides during :func:`discover_clusters`. """ tablename: str files: List[Path] if_exists: str include: Optional[List[str]] exclude: Optional[List[str]] source: str # "explicit" or "auto" pattern: Optional[str] = None partition_by: List[str] = field(default_factory=list) max_partitions: int = 10_000 @dataclass class _ExplicitPattern: """Parsed form of a single ``clusters[*]`` YAML entry. ``partition_by`` defaults to ``None`` meaning "inherit from folder level". An explicit empty list ``[]`` means "disable partitioning for this cluster". ``max_partitions`` defaults to ``None`` meaning "inherit from folder level". """ pattern: re.Pattern raw_pattern: str tablename: str if_exists: Optional[str] = None include: Optional[List[str]] = None exclude: Optional[List[str]] = None partition_by: Optional[List[str]] = None max_partitions: Optional[int] = None @dataclass class FolderConfig: """Folder-level configuration parsed from YAML. ``partition_by`` and ``max_partitions`` serve as defaults for every cluster unless overridden at the cluster level. """ folder: Path schemaname: str if_exists: str = "fail" auto_detect: bool = True include: Optional[List[str]] = None exclude: Optional[List[str]] = None explicit: List[_ExplicitPattern] = field(default_factory=list) partition_by: List[str] = field(default_factory=list) max_partitions: int = 10_000 # --------------------------------------------------------------------------- # Config loading # --------------------------------------------------------------------------- def _validate_if_exists(value: Any, where: str) -> str: s = str(value).lower() if s not in VALID_IF_EXISTS: raise ValueError( f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}" ) return s def _parse_columns_filter( raw: Dict[str, Any], where: str ) -> Tuple[Optional[List[str]], Optional[List[str]]]: include = raw.get("include") exclude = raw.get("exclude") if include is not None and exclude is not None: raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.") if include is not None and not isinstance(include, list): raise ValueError(f"{where}: 'include' must be a list of column names.") if exclude is not None and not isinstance(exclude, list): raise ValueError(f"{where}: 'exclude' must be a list of column names.") include_out = [str(c) for c in include] if include is not None else None exclude_out = [str(c) for c in exclude] if exclude is not None else None return include_out, exclude_out def _parse_partition_by( raw_value: Any, where: str, *, allow_none: bool = False ) -> Optional[List[str]]: """Parse a ``partition_by`` value from YAML. Returns a list of non-empty, unique column name strings. When ``allow_none`` is True (used for per-cluster entries), an omitted key returns ``None`` to signal "inherit from folder level". An explicit empty list ``[]`` always returns ``[]``. """ if raw_value is None: return None if allow_none else [] if isinstance(raw_value, str): if not raw_value.strip(): raise ValueError(f"{where}: 'partition_by' string must be non-empty.") return [raw_value.strip()] if isinstance(raw_value, list): if len(raw_value) == 0: return [] result: List[str] = [] for i, item in enumerate(raw_value): if not isinstance(item, str) or not item.strip(): raise ValueError( f"{where}: 'partition_by[{i}]' must be a non-empty string." ) result.append(str(item).strip()) if len(result) != len(set(result)): raise ValueError( f"{where}: 'partition_by' contains duplicate column names." ) return result raise ValueError( f"{where}: 'partition_by' must be a string or list of strings." ) def _parse_max_partitions( raw_value: Any, where: str, *, allow_none: bool = False ) -> Optional[int]: """Parse a ``max_partitions`` value from YAML. Returns a positive integer. When ``allow_none`` is True (used for per-cluster entries), an omitted key returns ``None`` to signal "inherit from folder level". """ if raw_value is None: return None if allow_none else 10_000 try: value = int(raw_value) except (TypeError, ValueError): raise ValueError( f"{where}: 'max_partitions' must be a positive integer, " f"got {raw_value!r}" ) if value <= 0: raise ValueError( f"{where}: 'max_partitions' must be a positive integer, " f"got {value}" ) return value def _validate_partition_vs_columns( partition_by: List[str], exclude: Optional[List[str]], where: str, ) -> None: """Raise if any ``partition_by`` column is in the ``exclude`` list.""" if not partition_by or exclude is None: return excluded_parts = [c for c in partition_by if c in exclude] if excluded_parts: raise ValueError( f"{where}: 'exclude' removes partition_by columns: {excluded_parts}" ) def load_folder_config(path: Path) -> FolderConfig: """Parse and validate the folder-level YAML config at ``path``. Supports optional ``partition_by`` and ``max_partitions`` at both the folder level (defaults for all clusters) and per explicit cluster entry (overrides the folder default). """ path = Path(path) with path.open("r", encoding="utf-8") as f: raw = yaml.safe_load(f) if not isinstance(raw, dict): raise ValueError(f"Config at {path} must be a YAML mapping at the top level.") missing = [k for k in ("folder", "schemaname") if k not in raw] if missing: raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}") folder = Path(raw["folder"]) if not folder.is_absolute(): candidate = (path.parent / folder).resolve() folder = candidate if candidate.exists() else folder schemaname = str(raw["schemaname"]) if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}") auto_detect = bool(raw.get("auto_detect", True)) include, exclude = _parse_columns_filter(raw, f"Config {path}") # -- folder-level partition settings ------------------------------------ partition_by = _parse_partition_by( raw.get("partition_by"), f"Config {path}" ) max_partitions = _parse_max_partitions( raw.get("max_partitions"), f"Config {path}" ) _validate_partition_vs_columns(partition_by, exclude, f"Config {path}") explicit: List[_ExplicitPattern] = [] clusters_raw = raw.get("clusters") or [] if not isinstance(clusters_raw, list): raise ValueError(f"Config {path}: 'clusters' must be a list if present.") for i, entry in enumerate(clusters_raw): where = f"Config {path} clusters[{i}]" if not isinstance(entry, dict): raise ValueError(f"{where} must be a mapping.") if "pattern" not in entry or "tablename" not in entry: raise ValueError(f"{where} must include 'pattern' and 'tablename'.") raw_pat = str(entry["pattern"]) try: compiled = re.compile(raw_pat) except re.error as e: raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e c_if_exists = ( _validate_if_exists(entry["if_exists"], where) if "if_exists" in entry else None ) c_include, c_exclude = _parse_columns_filter(entry, where) # -- per-cluster partition settings --------------------------------- c_partition_by = _parse_partition_by( entry.get("partition_by"), where, allow_none=True ) c_max_partitions = _parse_max_partitions( entry.get("max_partitions"), where, allow_none=True ) # Validate partition_by vs the effective exclude for this cluster. effective_exclude = c_exclude if c_exclude is not None else exclude effective_pb = c_partition_by if c_partition_by is not None else partition_by _validate_partition_vs_columns(effective_pb, effective_exclude, where) explicit.append( _ExplicitPattern( pattern=compiled, raw_pattern=raw_pat, tablename=str(entry["tablename"]), if_exists=c_if_exists, include=c_include, exclude=c_exclude, partition_by=c_partition_by, max_partitions=c_max_partitions, ) ) return FolderConfig( folder=folder, schemaname=schemaname, if_exists=if_exists, auto_detect=auto_detect, include=include, exclude=exclude, explicit=explicit, partition_by=partition_by, max_partitions=max_partitions, ) # --------------------------------------------------------------------------- # Cluster discovery # --------------------------------------------------------------------------- _TRAILING_DIGIT_RE = re.compile(r"\d+$") def _auto_prefix(stem: str) -> str: """Derive the cluster key for a file stem. Strip trailing digits and any trailing separators so ``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same ``group_a`` bucket. If nothing is stripped, the stem is its own key. """ stripped = _TRAILING_DIGIT_RE.sub("", stem) stripped = stripped.rstrip("_-") return stripped or stem def _list_sas_files(folder: Path) -> List[Path]: files: List[Path] = [] for p in sorted(folder.iterdir()): if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS: files.append(p) return files def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: """Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects. Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No SAS file is opened here. Explicit patterns are applied first, in config order; files matched by an earlier pattern are removed from the pool before the next pattern runs. A file matching two patterns triggers a hard error (that's almost always a config bug). Partition settings are resolved per cluster: * For explicit clusters, ``partition_by`` / ``max_partitions`` from the cluster entry override the folder defaults when present. ``None`` means "inherit"; an explicit ``[]`` disables partitioning. * For auto-detected clusters, folder defaults are inherited directly. """ if not cfg.folder.exists() or not cfg.folder.is_dir(): raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}") pool = _list_sas_files(cfg.folder) clusters: List[ClusterSpec] = [] # Detect cross-pattern overlap up front for a clearer error message. for i, p_i in enumerate(cfg.explicit): for j in range(i + 1, len(cfg.explicit)): p_j = cfg.explicit[j] for f in pool: if p_i.pattern.search(f.name) and p_j.pattern.search(f.name): raise ValueError( f"File {f.name!r} matches multiple explicit patterns: " f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}" ) remaining = list(pool) for patt in cfg.explicit: # Resolve partition_by: None = inherit folder, [] = disable, list = override resolved_pb = ( patt.partition_by if patt.partition_by is not None else cfg.partition_by ) resolved_mp = ( patt.max_partitions if patt.max_partitions is not None else cfg.max_partitions ) matched = [f for f in remaining if patt.pattern.search(f.name)] if not matched: # Not an error - the folder might legitimately not contain files # for this pattern on a given run. Emit a note for the CLI. clusters.append( ClusterSpec( tablename=patt.tablename, files=[], if_exists=patt.if_exists or cfg.if_exists, include=patt.include if patt.include is not None else cfg.include, exclude=patt.exclude if patt.exclude is not None else cfg.exclude, source="explicit", pattern=patt.raw_pattern, partition_by=resolved_pb, max_partitions=resolved_mp, ) ) continue remaining = [f for f in remaining if f not in matched] clusters.append( ClusterSpec( tablename=patt.tablename, files=sorted(matched), if_exists=patt.if_exists or cfg.if_exists, include=patt.include if patt.include is not None else cfg.include, exclude=patt.exclude if patt.exclude is not None else cfg.exclude, source="explicit", pattern=patt.raw_pattern, partition_by=resolved_pb, max_partitions=resolved_mp, ) ) if cfg.auto_detect and remaining: buckets: Dict[str, List[Path]] = {} for f in remaining: key = _auto_prefix(f.stem) buckets.setdefault(key, []).append(f) for key in sorted(buckets): clusters.append( ClusterSpec( tablename=key, files=sorted(buckets[key]), if_exists=cfg.if_exists, include=cfg.include, exclude=cfg.exclude, source="auto", partition_by=cfg.partition_by, max_partitions=cfg.max_partitions, ) ) return clusters # --------------------------------------------------------------------------- # Per-cluster load # --------------------------------------------------------------------------- def _infer_cluster_schema(path: Path, include, exclude): """Infer the Postgres column schema from a SAS file preview.""" preview_df, meta = read_sas_preview(path) preview_df = apply_column_filter(preview_df, include, exclude) total_rows = getattr(meta, "number_rows", None) columns = infer_schema(preview_df, meta, total_rows=total_rows) return columns def _discover_cluster_partitions( cluster: ClusterSpec, columns: Dict, ) -> dict: """Scan ALL files in ``cluster`` to discover partition values. Returns a nested partition-value tree suitable for passing to :func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`. Each file is scanned chunk-by-chunk so the full dataset is never materialized in memory. """ merged: dict = {} for path in cluster.files: def _filtered_chunks(p=path): for chunk_df, _chunk_meta in iter_sas_chunks(p): yield apply_column_filter( chunk_df, cluster.include, cluster.exclude ) file_tree = discover_partition_values_chunked( _filtered_chunks(), cluster.partition_by, columns, ) _merge_partition_trees(merged, file_tree) return merged def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int: """Load every file in ``cluster`` into one table. Returns total rows loaded. When ``cluster.partition_by`` is non-empty, partition values are discovered across ALL files before table creation so the full partition tree exists before any data is copied. Commits happen per chunk inside :func:`load_sas.copy_dataframes`. If a file mid-cluster fails, earlier chunks - including chunks from earlier files in the cluster - stay committed; only the in-flight chunk is rolled back by :func:`main`. """ if not cluster.files: return 0 first, *rest = cluster.files first_columns = _infer_cluster_schema(first, cluster.include, cluster.exclude) # -- Partition support -------------------------------------------------- partition_values: Optional[dict] = None if cluster.partition_by: # Validate that all partition_by columns exist in the inferred schema. missing_pcols = [ c for c in cluster.partition_by if c not in first_columns ] if missing_pcols: raise ValueError( f"cluster {cluster.tablename!r}: partition_by references " f"columns not present in the inferred schema: {missing_pcols}" ) # Discover partition values across ALL files in the cluster. # In append mode the partitions already exist, so skip the scan. if cluster.if_exists == "append": print( " [info] append mode: skipping partition discovery " "(partitions assumed to exist)", file=sys.stderr, ) else: print( f" discovering partition values across " f"{len(cluster.files)} file(s)...", file=sys.stderr, ) partition_values = _discover_cluster_partitions( cluster, first_columns, ) total_parts = _count_partitions(partition_values) print( f" discovered {total_parts:,} partition table(s) " f"across {len(cluster.partition_by)} level(s)", file=sys.stderr, ) create_table( conn, schemaname, cluster.tablename, first_columns, cluster.if_exists, partition_by=cluster.partition_by or None, partition_values=partition_values, max_partitions=cluster.max_partitions, ) total = 0 total += _stream_file( conn, schemaname, cluster.tablename, first, first_columns, cluster.include, cluster.exclude, ) for path in rest: columns = _infer_cluster_schema(path, cluster.include, cluster.exclude) # Uses the same check that if_exists=append runs. A type mismatch or # missing column aborts the cluster; because chunks commit as they # load, earlier chunks in the cluster remain in the table. assert_schema_compatible(conn, schemaname, cluster.tablename, columns) total += _stream_file( conn, schemaname, cluster.tablename, path, columns, cluster.include, cluster.exclude, ) return total def _stream_file( conn, schemaname: str, tablename: str, path: Path, columns, include, exclude, ) -> int: def _chunks(): seen = 0 for chunk_df, _chunk_meta in iter_sas_chunks(path): chunk_df = apply_column_filter(chunk_df, include, exclude) seen += len(chunk_df) print( f" {path.name}: streaming... {seen:,} rows", file=sys.stderr, ) yield chunk_df return copy_dataframes(conn, schemaname, tablename, _chunks(), columns) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description=( "Load every SAS file in a folder into Postgres, grouping files " "into clusters that each become one table." ), ) p.add_argument("--config", required=True, type=Path, help="Path to YAML config") p.add_argument( "--dry-run", action="store_true", help=( "Print discovered clusters and the inferred DDL for each " "(CREATE TABLE plus partition DDL when applicable). For " "partitioned clusters all files are scanned to discover " "partition values. The database is never touched." ), ) p.add_argument( "--fail-fast", action="store_true", help=( "Abort on the first cluster failure. Default is to roll that " "cluster back and continue with the next one." ), ) p.add_argument( "--dbcreds", action="store_true", help=( "Prompt for database username and password instead of reading " "PGUSER / PGPASSWORD from the environment or .env file." ), ) return p def _describe_cluster(cluster: ClusterSpec) -> str: src = f"{cluster.source}" if cluster.pattern: src += f" pattern={cluster.pattern!r}" files = ", ".join(f.name for f in cluster.files) or "(no matching files)" parts = "" if cluster.partition_by: parts = f"\n partition_by: {cluster.partition_by}" return ( f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n" f" files: {files}{parts}" ) def main(argv: Optional[List[str]] = None) -> int: args = _build_argparser().parse_args(argv) load_dotenv() cfg = load_folder_config(args.config) if not cfg.folder.exists() or not cfg.folder.is_dir(): print(f"error: folder not found: {cfg.folder}", file=sys.stderr) return 2 clusters = discover_clusters(cfg) loadable = [c for c in clusters if c.files] if not loadable: print( f"error: no SAS files found in {cfg.folder} " f"(looked for {', '.join(SAS_EXTENSIONS)})", file=sys.stderr, ) return 2 print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:") for c in clusters: print(_describe_cluster(c)) if args.dry_run: print() for c in loadable: print(f"--- DDL for cluster {c.tablename!r} ---") columns = _infer_cluster_schema(c.files[0], c.include, c.exclude) # Print parent CREATE TABLE (with PARTITION BY if applicable). print( render_create_table( cfg.schemaname, c.tablename, columns, partition_by=c.partition_by or None, ) ) # Print child partition DDL when the cluster is partitioned. if c.partition_by: # Validate partition columns exist in the schema. missing_pcols = [ col for col in c.partition_by if col not in columns ] if missing_pcols: print( f" [error] partition_by references columns not in " f"schema: {missing_pcols}", file=sys.stderr, ) else: print( f" discovering partition values across " f"{len(c.files)} file(s)...", file=sys.stderr, ) partition_values = _discover_cluster_partitions( c, columns, ) total_parts = _count_partitions(partition_values) print( f" discovered {total_parts:,} partition table(s) " f"across {len(c.partition_by)} level(s)", file=sys.stderr, ) child_stmts = render_partition_ddl( cfg.schemaname, c.tablename, c.partition_by, partition_values, columns, max_partitions=c.max_partitions, ) for stmt in child_stmts: print() print(stmt) print() return 0 db_user = db_password = None if args.dbcreds: db_user = input("Database username: ") db_password = getpass.getpass("Database password: ") conn = connect(user=db_user, password=db_password) conn.autocommit = False failures: List[Tuple[str, Exception]] = [] totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows) try: for cluster in loadable: print( f"\n>>> loading cluster {cluster.tablename!r} " f"({len(cluster.files)} file(s))" ) try: rows = load_cluster(conn, cluster, cfg.schemaname) conn.commit() totals.append((cluster.tablename, len(cluster.files), rows)) print( f" -> loaded {rows:,} row(s) into " f"{cfg.schemaname}.{cluster.tablename}" ) except Exception as e: conn.rollback() failures.append((cluster.tablename, e)) print( f" !! cluster {cluster.tablename!r} failed: {e}", file=sys.stderr, ) if args.fail_fast: break finally: conn.close() print("\n=== summary ===") for name, fcount, rows in totals: print(f" ok {name}: {fcount} file(s), {rows:,} row(s)") for name, err in failures: print(f" FAIL {name}: {err}", file=sys.stderr) return 1 if failures else 0 if __name__ == "__main__": sys.exit(main())