foxtrot/generic_loader/load_folder.py

2010 lines
77 KiB
Python
Raw Normal View History

2026-04-22 01:05:26 +00:00
"""Folder-level data-to-Postgres loader.
2026-04-22 01:05:26 +00:00
Wraps :mod:`load_sas` so an entire directory of data files (SAS or delimited
text) can be ingested in one invocation. A directory often contains several
*clusters* of files that share a schema (e.g. ``group_a1.sas7bdat``,
``group_a2.sas7bdat``, ... or ``group_a1.csv``, ``group_a2.csv``, ...). Each
cluster becomes one Postgres table; files inside a cluster are appended to it.
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
1. YAML config
--------------
::
folder: samples/folder_test # required; relative paths resolve against
# the config file's directory
schemaname: public # required
# Optional. One of: fail | replace | append. Default: fail.
# Applied to the first file of each cluster (subsequent files in the
# cluster always run through the append-mode compatibility check).
if_exists: fail
# Optional. Default: true. When true, files that don't match any explicit
# pattern below are grouped by their common prefix (trailing digits, and
# optional trailing separators, are stripped from each file stem).
auto_detect: true
# Optional. Columns to force-include or force-exclude across every file.
# include and exclude are mutually exclusive.
# include: [ID, INTCOL]
# exclude: [ALLNULL]
# Optional folder default for explicit column type overrides. These
# win over the cluster-wide auto-union computed during pre-scan; set
# them when a column's SAS-level type varies across files (e.g. phone
# IDs stored as CHAR in some years and NUM in others) and you want to
# pin the Postgres type yourself rather than accept the auto-derived
# one. Per-cluster column_types inside each clusters[*] entry are
# merged on top of this map.
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOME_BIGINT_COL: BIGINT
2026-04-20 14:56:00 +00:00
# Optional folder default for LIST partitioning. Omit or set [] for no
# partitioning. Accepts a single string or a list of column names.
# partition_by:
# - state
# - zip
# Optional folder default threshold. Default: 10000.
# max_partitions: 10000
# Optional explicit cluster patterns. Each pattern is matched against the
# file *basename*. Matched files are pulled out of the auto-detect pool.
# Per-cluster if_exists/include/exclude/partition_by/max_partitions/
# column_types override the folder-level defaults.
clusters:
- pattern: '^group_a\\d+\\.sas7bdat$'
tablename: group_a
- pattern: '^group_b\\d+\\.sas7bdat$'
tablename: group_b
if_exists: replace
column_types:
PHONE_PREFIX: TEXT
2. Command-line interface
-------------------------
::
python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast]
[--dbcreds]
Flags:
--config PATH Required. Path to the YAML config above.
2026-04-20 14:56:00 +00:00
--dry-run Print the discovered clusters and the inferred DDL for
each (CREATE TABLE plus partition DDL when applicable).
For partitioned clusters all files are scanned to
discover partition values. The database is never touched.
--fail-fast Abort the whole run on the first cluster failure.
Default is to log the failure, roll that cluster back,
and keep going.
--dbcreds Prompt interactively for the database username and
password instead of reading ``PGUSER`` / ``PGPASSWORD``
from the environment or ``.env`` file. The password
prompt does not echo. Has no effect with ``--dry-run``
(no connection is opened).
Exit codes:
0 - every cluster loaded successfully (or dry-run completed)
1 - at least one cluster failed (details on stderr)
2026-04-22 01:05:26 +00:00
2 - folder does not exist / contains no data files
3. Discovery rules
------------------
2026-04-22 01:05:26 +00:00
* Supported SAS extensions: ``.sas7bdat``, ``.xpt``, ``.xport``.
Supported text extensions: ``.txt``, ``.csv``, ``.tsv``.
The ``file_type`` config key controls which set is used.
The folder is not scanned recursively.
* Explicit patterns are tried in order. A file matched by one pattern is
removed from the pool before the next pattern runs, so earlier patterns
win in case of overlap. Overlap between patterns is flagged as an error
at config-parse time (a file matching two patterns is almost always a bug).
* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with
any trailing ``_`` / ``-`` stripped afterward. Stems without trailing
digits become singleton clusters named after the stem.
* Within a cluster, files are sorted **numerically** by the last digit
group in the stem, so ``..._9_...`` comes before ``..._10_...`` /
``..._40_...`` regardless of zero-padding. The first file in that
order drives schema inference; the rest are checked against that
schema via :func:`load_sas.assert_schema_compatible`. Gaps in the
numeric sequence (missing ``3``, ``7``, ``14``, ...) are irrelevant -
whatever files are present get loaded in numeric order.
* Auto-detect only recognizes *trailing* digit runs. File names where
the varying number sits in the middle of the stem (surrounded by
other name components) are not grouped by auto-detect - each becomes
its own singleton cluster. Use an explicit pattern to bucket them::
clusters:
- pattern: '^year2020_regionA_\\d+_detail\\.sas7bdat$'
tablename: year2020_regionA_detail
The regex still matches any digit width, so numbers like ``9`` and
``40`` both land in the same cluster and the numeric sort above puts
``9`` before ``40``.
4. Library usage
----------------
::
from load_folder import load_folder_config, discover_clusters, load_cluster
from load_sas import connect
cfg = load_folder_config("folder_config.yaml")
clusters = discover_clusters(cfg)
conn = connect()
try:
for cluster in clusters:
load_cluster(conn, cluster, cfg.schemaname)
finally:
conn.close()
"""
from __future__ import annotations
import argparse
2026-04-18 17:37:22 +00:00
import getpass
import multiprocessing as mp
import os
import queue as _queue_mod
import re
import sys
import threading
from concurrent.futures import (
CancelledError,
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
)
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
from dotenv import load_dotenv
from tqdm import tqdm
from load_sas import (
2026-04-22 01:05:26 +00:00
TEXT_EXTENSIONS,
VALID_FILE_TYPES,
VALID_IF_EXISTS,
2026-04-20 14:56:00 +00:00
_count_partitions,
2026-04-22 01:05:26 +00:00
_is_text_file,
2026-04-20 14:56:00 +00:00
_merge_partition_trees,
apply_column_filter,
assert_schema_compatible,
connect,
copy_dataframes,
2026-04-20 15:18:09 +00:00
create_indexes,
create_table,
2026-04-20 14:56:00 +00:00
discover_partition_values_chunked,
extract_union_metadata,
infer_schema,
iter_sas_chunks,
read_sas_metadata,
read_sas_preview,
2026-04-20 15:18:09 +00:00
render_create_indexes,
render_create_table,
2026-04-20 14:56:00 +00:00
render_partition_ddl,
union_column_types,
)
SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport")
2026-04-22 01:05:26 +00:00
SUPPORTED_EXTENSIONS = SAS_EXTENSIONS + TEXT_EXTENSIONS
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class ClusterSpec:
2026-04-20 14:56:00 +00:00
"""Resolved per-cluster load settings.
2026-04-20 15:18:09 +00:00
``partition_by``, ``max_partitions``, and ``indexes`` are resolved from
the folder defaults and any per-cluster overrides during
:func:`discover_clusters`. ``column_types`` holds the effective type
overrides for this cluster: user-supplied YAML entries merged on top
of the auto-union result computed during pre-scan (see :func:`main`).
The same dict is threaded through to workers so every file in the
cluster infers the same schema.
2026-04-22 01:05:26 +00:00
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
``quotechar``) is propagated from the folder config during
:func:`discover_clusters` so that reader functions receive the correct
parameters for delimited text files.
2026-04-20 14:56:00 +00:00
"""
tablename: str
files: List[Path]
if_exists: str
include: Optional[List[str]]
exclude: Optional[List[str]]
source: str # "explicit" or "auto"
pattern: Optional[str] = None
2026-04-20 14:56:00 +00:00
partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000
2026-04-20 15:18:09 +00:00
indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
2026-04-22 01:05:26 +00:00
file_type: str = "sas"
delimiter: str = ","
text_encoding: str = "utf-8"
quotechar: str = '"'
@dataclass
class _ExplicitPattern:
2026-04-20 14:56:00 +00:00
"""Parsed form of a single ``clusters[*]`` YAML entry.
``partition_by`` defaults to ``None`` meaning "inherit from folder level".
An explicit empty list ``[]`` means "disable partitioning for this cluster".
``max_partitions`` defaults to ``None`` meaning "inherit from folder level".
2026-04-20 15:18:09 +00:00
``indexes`` defaults to ``None`` meaning "inherit from folder level".
``column_types`` defaults to ``None`` meaning "inherit from folder level";
an explicit ``{}`` means "no user overrides for this cluster".
2026-04-20 14:56:00 +00:00
"""
pattern: re.Pattern
raw_pattern: str
tablename: str
if_exists: Optional[str] = None
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
2026-04-20 14:56:00 +00:00
partition_by: Optional[List[str]] = None
max_partitions: Optional[int] = None
2026-04-20 15:18:09 +00:00
indexes: Optional[List[str]] = None
column_types: Optional[Dict[str, str]] = None
all_nullable: Optional[bool] = None
@dataclass
class FolderConfig:
2026-04-20 14:56:00 +00:00
"""Folder-level configuration parsed from YAML.
2026-04-20 15:18:09 +00:00
``partition_by``, ``max_partitions``, and ``indexes`` serve as defaults
for every cluster unless overridden at the cluster level.
``column_types`` is a ``{column_name: postgres_type_str}`` map of
user-supplied type overrides that win over the auto-union computed
during pre-scan.
2026-04-22 01:05:26 +00:00
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
``quotechar``) controls how delimited text files are discovered and
read. These fields are ignored when ``file_type`` is ``"sas"``
(the default).
2026-04-20 14:56:00 +00:00
"""
folder: Path
schemaname: str
if_exists: str = "fail"
auto_detect: bool = True
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
explicit: List[_ExplicitPattern] = field(default_factory=list)
2026-04-20 14:56:00 +00:00
partition_by: List[str] = field(default_factory=list)
max_partitions: int = 10_000
2026-04-20 15:18:09 +00:00
indexes: List[str] = field(default_factory=list)
column_types: Dict[str, str] = field(default_factory=dict)
all_nullable: bool = False
2026-04-22 01:05:26 +00:00
file_type: str = "sas"
delimiter: str = ","
text_encoding: str = "utf-8"
quotechar: str = '"'
# ---------------------------------------------------------------------------
# Config loading
# ---------------------------------------------------------------------------
def _validate_if_exists(value: Any, where: str) -> str:
s = str(value).lower()
if s not in VALID_IF_EXISTS:
raise ValueError(
f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}"
)
return s
def _parse_columns_filter(
raw: Dict[str, Any], where: str
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
include = raw.get("include")
exclude = raw.get("exclude")
if include is not None and exclude is not None:
raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.")
if include is not None and not isinstance(include, list):
raise ValueError(f"{where}: 'include' must be a list of column names.")
if exclude is not None and not isinstance(exclude, list):
raise ValueError(f"{where}: 'exclude' must be a list of column names.")
include_out = [str(c) for c in include] if include is not None else None
exclude_out = [str(c) for c in exclude] if exclude is not None else None
return include_out, exclude_out
2026-04-20 14:56:00 +00:00
def _parse_partition_by(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[List[str]]:
"""Parse a ``partition_by`` value from YAML.
Returns a list of non-empty, unique column name strings. When
``allow_none`` is True (used for per-cluster entries), an omitted key
returns ``None`` to signal "inherit from folder level". An explicit
empty list ``[]`` always returns ``[]``.
"""
if raw_value is None:
return None if allow_none else []
if isinstance(raw_value, str):
if not raw_value.strip():
raise ValueError(f"{where}: 'partition_by' string must be non-empty.")
return [raw_value.strip()]
if isinstance(raw_value, list):
if len(raw_value) == 0:
return []
result: List[str] = []
for i, item in enumerate(raw_value):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'partition_by[{i}]' must be a non-empty string."
)
result.append(str(item).strip())
if len(result) != len(set(result)):
raise ValueError(
f"{where}: 'partition_by' contains duplicate column names."
)
return result
raise ValueError(
f"{where}: 'partition_by' must be a string or list of strings."
)
def _parse_max_partitions(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[int]:
"""Parse a ``max_partitions`` value from YAML.
Returns a positive integer. When ``allow_none`` is True (used for
per-cluster entries), an omitted key returns ``None`` to signal
"inherit from folder level".
"""
if raw_value is None:
return None if allow_none else 10_000
try:
value = int(raw_value)
except (TypeError, ValueError):
raise ValueError(
f"{where}: 'max_partitions' must be a positive integer, "
f"got {raw_value!r}"
)
if value <= 0:
raise ValueError(
f"{where}: 'max_partitions' must be a positive integer, "
f"got {value}"
)
return value
def _validate_partition_vs_columns(
partition_by: List[str],
exclude: Optional[List[str]],
where: str,
) -> None:
"""Raise if any ``partition_by`` column is in the ``exclude`` list."""
if not partition_by or exclude is None:
return
excluded_parts = [c for c in partition_by if c in exclude]
if excluded_parts:
raise ValueError(
f"{where}: 'exclude' removes partition_by columns: {excluded_parts}"
)
2026-04-20 15:18:09 +00:00
def _parse_indexes(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[List[str]]:
"""Parse an ``indexes`` value from YAML.
Returns a list of non-empty, unique column name strings. When
``allow_none`` is True (used for per-cluster entries), an omitted key
returns ``None`` to signal "inherit from folder level". An explicit
empty list ``[]`` always returns ``[]``.
"""
if raw_value is None:
return None if allow_none else []
if isinstance(raw_value, str):
if not raw_value.strip():
raise ValueError(f"{where}: 'indexes' string must be non-empty.")
return [raw_value.strip()]
if isinstance(raw_value, list):
if len(raw_value) == 0:
return []
result: List[str] = []
for i, item in enumerate(raw_value):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'indexes[{i}]' must be a non-empty string."
)
result.append(str(item).strip())
if len(result) != len(set(result)):
raise ValueError(
f"{where}: 'indexes' contains duplicate column names."
)
return result
raise ValueError(
f"{where}: 'indexes' must be a string or list of strings."
)
def _validate_indexes_vs_columns(
indexes: List[str],
exclude: Optional[List[str]],
where: str,
) -> None:
"""Raise if any ``indexes`` column is in the ``exclude`` list."""
if not indexes or exclude is None:
return
excluded_idx = [c for c in indexes if c in exclude]
if excluded_idx:
raise ValueError(
f"{where}: 'exclude' removes index columns: {excluded_idx}"
)
def _parse_column_types(
raw_value: Any, where: str, *, allow_none: bool = False
) -> Optional[Dict[str, str]]:
"""Parse a ``column_types`` mapping from YAML.
The value must be a mapping ``{column_name: pg_type_str}``. Keys and
values are whitespace-stripped strings; empty strings raise. When
``allow_none`` is True (used for per-cluster entries), an omitted key
returns ``None`` to mean "inherit from folder level"; an explicit
empty mapping returns ``{}`` (no overrides for this cluster).
"""
if raw_value is None:
return None if allow_none else {}
if not isinstance(raw_value, dict):
raise ValueError(
f"{where}: 'column_types' must be a mapping of "
f"{{column_name: postgres_type}}."
)
out: Dict[str, str] = {}
for k, v in raw_value.items():
key = str(k).strip()
if not key:
raise ValueError(
f"{where}: 'column_types' contains an empty column name."
)
if not isinstance(v, str) or not v.strip():
raise ValueError(
f"{where}: 'column_types[{key}]' must be a non-empty "
f"Postgres type string (got {v!r})."
)
out[key] = v.strip()
return out
def load_folder_config(path: Path) -> FolderConfig:
2026-04-20 14:56:00 +00:00
"""Parse and validate the folder-level YAML config at ``path``.
Supports optional ``partition_by`` and ``max_partitions`` at both the
folder level (defaults for all clusters) and per explicit cluster entry
(overrides the folder default).
"""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(f"Config at {path} must be a YAML mapping at the top level.")
missing = [k for k in ("folder", "schemaname") if k not in raw]
if missing:
raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}")
folder = Path(raw["folder"])
if not folder.is_absolute():
candidate = (path.parent / folder).resolve()
folder = candidate if candidate.exists() else folder
schemaname = str(raw["schemaname"])
if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}")
auto_detect = bool(raw.get("auto_detect", True))
include, exclude = _parse_columns_filter(raw, f"Config {path}")
2026-04-20 14:56:00 +00:00
# -- folder-level partition settings ------------------------------------
partition_by = _parse_partition_by(
raw.get("partition_by"), f"Config {path}"
)
max_partitions = _parse_max_partitions(
raw.get("max_partitions"), f"Config {path}"
)
_validate_partition_vs_columns(partition_by, exclude, f"Config {path}")
2026-04-20 15:18:09 +00:00
# -- folder-level index settings ----------------------------------------
indexes = _parse_indexes(raw.get("indexes"), f"Config {path}")
_validate_indexes_vs_columns(indexes, exclude, f"Config {path}")
# -- folder-level column_types overrides --------------------------------
column_types = _parse_column_types(
raw.get("column_types"), f"Config {path}"
)
# -- folder-level all_nullable -----------------------------------------
# Sets the default for every cluster. Per-cluster ``all_nullable`` wins
# when present; the CLI ``--all-nullable`` flag trumps both.
raw_an_folder = raw.get("all_nullable", False)
if not isinstance(raw_an_folder, bool):
raise ValueError(
f"Config {path}: 'all_nullable' must be a boolean "
f"(got {raw_an_folder!r})."
)
all_nullable_default = bool(raw_an_folder)
2026-04-22 01:05:26 +00:00
# -- file_type ----------------------------------------------------------
file_type = str(raw.get("file_type", "sas")).lower()
if file_type not in VALID_FILE_TYPES:
raise ValueError(
f"Config {path}: file_type={file_type!r} is not one of "
f"{VALID_FILE_TYPES}"
)
# -- text-file-specific fields ------------------------------------------
raw_delim = raw.get("delimiter", ",")
if isinstance(raw_delim, str):
delim_lower = raw_delim.lower().strip()
if delim_lower in ("tab", "\\t"):
delimiter = "\t"
elif delim_lower in ("pipe", "|"):
delimiter = "|"
else:
delimiter = raw_delim
else:
delimiter = str(raw_delim)
text_encoding = str(raw.get("text_encoding", "utf-8"))
quotechar = str(raw.get("quotechar", '"'))
explicit: List[_ExplicitPattern] = []
clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list):
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
for i, entry in enumerate(clusters_raw):
where = f"Config {path} clusters[{i}]"
if not isinstance(entry, dict):
raise ValueError(f"{where} must be a mapping.")
if "pattern" not in entry or "tablename" not in entry:
raise ValueError(f"{where} must include 'pattern' and 'tablename'.")
raw_pat = str(entry["pattern"])
try:
compiled = re.compile(raw_pat)
except re.error as e:
raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e
c_if_exists = (
_validate_if_exists(entry["if_exists"], where)
if "if_exists" in entry
else None
)
c_include, c_exclude = _parse_columns_filter(entry, where)
2026-04-20 14:56:00 +00:00
# -- per-cluster partition settings ---------------------------------
c_partition_by = _parse_partition_by(
entry.get("partition_by"), where, allow_none=True
)
c_max_partitions = _parse_max_partitions(
entry.get("max_partitions"), where, allow_none=True
)
# Validate partition_by vs the effective exclude for this cluster.
effective_exclude = c_exclude if c_exclude is not None else exclude
effective_pb = c_partition_by if c_partition_by is not None else partition_by
_validate_partition_vs_columns(effective_pb, effective_exclude, where)
2026-04-20 15:18:09 +00:00
# -- per-cluster index settings -------------------------------------
c_indexes = _parse_indexes(
entry.get("indexes"), where, allow_none=True
)
effective_idx = c_indexes if c_indexes is not None else indexes
_validate_indexes_vs_columns(effective_idx, effective_exclude, where)
# -- per-cluster column_types overrides -----------------------------
c_column_types = _parse_column_types(
entry.get("column_types"), where, allow_none=True
)
# -- per-cluster all_nullable ---------------------------------------
c_all_nullable: Optional[bool]
if "all_nullable" in entry:
raw_c_an = entry["all_nullable"]
if not isinstance(raw_c_an, bool):
raise ValueError(
f"{where}: 'all_nullable' must be a boolean "
f"(got {raw_c_an!r})."
)
c_all_nullable = bool(raw_c_an)
else:
c_all_nullable = None
explicit.append(
_ExplicitPattern(
pattern=compiled,
raw_pattern=raw_pat,
tablename=str(entry["tablename"]),
if_exists=c_if_exists,
include=c_include,
exclude=c_exclude,
2026-04-20 14:56:00 +00:00
partition_by=c_partition_by,
max_partitions=c_max_partitions,
2026-04-20 15:18:09 +00:00
indexes=c_indexes,
column_types=c_column_types,
all_nullable=c_all_nullable,
)
)
return FolderConfig(
folder=folder,
schemaname=schemaname,
if_exists=if_exists,
auto_detect=auto_detect,
include=include,
exclude=exclude,
explicit=explicit,
2026-04-20 14:56:00 +00:00
partition_by=partition_by,
max_partitions=max_partitions,
2026-04-20 15:18:09 +00:00
indexes=indexes,
column_types=column_types or {},
all_nullable=all_nullable_default,
2026-04-22 01:05:26 +00:00
file_type=file_type,
delimiter=delimiter,
text_encoding=text_encoding,
quotechar=quotechar,
)
# ---------------------------------------------------------------------------
# Cluster discovery
# ---------------------------------------------------------------------------
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
_DIGIT_GROUP_RE = re.compile(r"\d+")
def _auto_prefix(stem: str) -> str:
"""Derive the cluster key for a file stem.
Strip trailing digits and any trailing separators so
``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same
``group_a`` bucket. If nothing is stripped, the stem is its own key.
"""
stripped = _TRAILING_DIGIT_RE.sub("", stem)
stripped = stripped.rstrip("_-")
return stripped or stem
def _cluster_sort_key(path: Path) -> Tuple[int, str]:
"""Sort key for ordering files within a cluster.
Sorts numerically by the LAST digit group in the stem so ``_9`` comes
before ``_10`` / ``_40`` regardless of width, and so a file named
``foo_9_detail`` lands before ``foo_40_detail``. The first file under
this order is the one whose schema is inferred and used to create the
target table; sorting numerically keeps that choice stable as the file
set grows. Files with no digits fall to ``-1`` so they sort before
numbered files; the stem is a tiebreaker for reproducibility.
"""
digits = _DIGIT_GROUP_RE.findall(path.stem)
n = int(digits[-1]) if digits else -1
return (n, path.stem)
2026-04-22 01:05:26 +00:00
def _list_data_files(folder: Path, file_type: str = "sas") -> List[Path]:
"""List data files in ``folder`` filtered by ``file_type``.
When ``file_type`` is ``"text"``, only text extensions are matched.
When ``file_type`` is ``"sas"`` (the default), only SAS extensions are
matched. This keeps SAS and text file pools separate so a folder
containing both types doesn't accidentally mix them.
"""
if file_type == "text":
extensions = TEXT_EXTENSIONS
else:
extensions = SAS_EXTENSIONS
files: List[Path] = []
for p in sorted(folder.iterdir()):
2026-04-22 01:05:26 +00:00
if p.is_file() and p.suffix.lower() in extensions:
files.append(p)
return files
2026-04-22 01:05:26 +00:00
def _list_sas_files(folder: Path) -> List[Path]:
"""Backward-compatible wrapper around :func:`_list_data_files`."""
return _list_data_files(folder, file_type="sas")
def _build_text_kw(cluster: ClusterSpec) -> Dict[str, Any]:
"""Build the text-file keyword arguments dict from a cluster's config.
Returns a dict suitable for spreading into :func:`read_sas_preview`,
:func:`read_sas_metadata`, :func:`iter_sas_chunks`, etc. For SAS
file_type clusters the dict still carries the defaults, which the
reader functions ignore for SAS extensions.
"""
return dict(
delimiter=cluster.delimiter,
text_encoding=cluster.text_encoding,
quotechar=cluster.quotechar,
)
def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
"""Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects.
Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No
2026-04-22 01:05:26 +00:00
data file is opened here. Explicit patterns are applied first, in config
order; files matched by an earlier pattern are removed from the pool
2026-04-22 01:05:26 +00:00
before the next pattern runs. A file matching two patterns triggers a
hard error (that's almost always a config bug).
2026-04-20 14:56:00 +00:00
Partition settings are resolved per cluster:
* For explicit clusters, ``partition_by`` / ``max_partitions`` from the
cluster entry override the folder defaults when present. ``None``
means "inherit"; an explicit ``[]`` disables partitioning.
* For auto-detected clusters, folder defaults are inherited directly.
2026-04-22 01:05:26 +00:00
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
``quotechar``) is propagated from the folder config to every cluster.
"""
if not cfg.folder.exists() or not cfg.folder.is_dir():
raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}")
2026-04-22 01:05:26 +00:00
pool = _list_data_files(cfg.folder, file_type=cfg.file_type)
clusters: List[ClusterSpec] = []
2026-04-22 01:05:26 +00:00
# Text-file kwargs to propagate to every cluster.
_text_fields = dict(
file_type=cfg.file_type,
delimiter=cfg.delimiter,
text_encoding=cfg.text_encoding,
quotechar=cfg.quotechar,
)
# Detect cross-pattern overlap up front for a clearer error message.
for i, p_i in enumerate(cfg.explicit):
for j in range(i + 1, len(cfg.explicit)):
p_j = cfg.explicit[j]
for f in pool:
if p_i.pattern.search(f.name) and p_j.pattern.search(f.name):
raise ValueError(
f"File {f.name!r} matches multiple explicit patterns: "
f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}"
)
remaining = list(pool)
for patt in cfg.explicit:
2026-04-20 14:56:00 +00:00
# Resolve partition_by: None = inherit folder, [] = disable, list = override
resolved_pb = (
patt.partition_by if patt.partition_by is not None
else cfg.partition_by
)
resolved_mp = (
patt.max_partitions if patt.max_partitions is not None
else cfg.max_partitions
)
2026-04-20 15:18:09 +00:00
# Resolve indexes: None = inherit folder, [] = disable, list = override
resolved_idx = (
patt.indexes if patt.indexes is not None
else cfg.indexes
)
# Resolve column_types: user overrides only. The auto-union adds
# more entries later (in :func:`main`) after the metadata pre-scan.
# None = inherit folder, {} = no cluster-level overrides, dict =
# cluster-level overrides that win over folder-level entries.
if patt.column_types is None:
resolved_ct: Dict[str, str] = dict(cfg.column_types)
else:
resolved_ct = {**cfg.column_types, **patt.column_types}
2026-04-20 14:56:00 +00:00
# Resolve all_nullable: None = inherit folder, bool = override.
resolved_an = (
patt.all_nullable if patt.all_nullable is not None
else cfg.all_nullable
)
matched = [f for f in remaining if patt.pattern.search(f.name)]
if not matched:
# Not an error - the folder might legitimately not contain files
# for this pattern on a given run. Emit a note for the CLI.
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=[],
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
2026-04-20 14:56:00 +00:00
partition_by=resolved_pb,
max_partitions=resolved_mp,
2026-04-20 15:18:09 +00:00
indexes=resolved_idx,
column_types=dict(resolved_ct),
all_nullable=resolved_an,
2026-04-22 01:05:26 +00:00
**_text_fields,
)
)
continue
remaining = [f for f in remaining if f not in matched]
clusters.append(
ClusterSpec(
tablename=patt.tablename,
files=sorted(matched, key=_cluster_sort_key),
if_exists=patt.if_exists or cfg.if_exists,
include=patt.include if patt.include is not None else cfg.include,
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
source="explicit",
pattern=patt.raw_pattern,
2026-04-20 14:56:00 +00:00
partition_by=resolved_pb,
max_partitions=resolved_mp,
2026-04-20 15:18:09 +00:00
indexes=resolved_idx,
column_types=dict(resolved_ct),
all_nullable=resolved_an,
2026-04-22 01:05:26 +00:00
**_text_fields,
)
)
if cfg.auto_detect and remaining:
buckets: Dict[str, List[Path]] = {}
for f in remaining:
key = _auto_prefix(f.stem)
buckets.setdefault(key, []).append(f)
for key in sorted(buckets):
clusters.append(
ClusterSpec(
tablename=key,
files=sorted(buckets[key], key=_cluster_sort_key),
if_exists=cfg.if_exists,
include=cfg.include,
exclude=cfg.exclude,
source="auto",
2026-04-20 14:56:00 +00:00
partition_by=cfg.partition_by,
max_partitions=cfg.max_partitions,
2026-04-20 15:18:09 +00:00
indexes=cfg.indexes,
column_types=dict(cfg.column_types),
all_nullable=cfg.all_nullable,
2026-04-22 01:05:26 +00:00
**_text_fields,
)
)
return clusters
# ---------------------------------------------------------------------------
# Per-cluster load
# ---------------------------------------------------------------------------
def _infer_cluster_schema(
path: Path,
include,
exclude,
*,
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
2026-04-22 01:05:26 +00:00
text_kw: Optional[Dict[str, Any]] = None,
) -> Tuple[Dict, Optional[int]]:
2026-04-22 01:05:26 +00:00
"""Infer the Postgres column schema from a data file preview.
Returns ``(columns, total_rows)``. ``total_rows`` comes from the
2026-04-22 01:05:26 +00:00
file metadata (the file's declared row count) and is threaded
through to :func:`_stream_file` so the tqdm progress bar has a real
denominator instead of an indeterminate spinner. ``column_types``
lets the caller pin specific columns to a chosen Postgres type
(typically the merged auto-union + YAML overrides for the cluster).
``force_nullable`` stamps every column nullable regardless of what
the preview shows - see :func:`load_sas.infer_schema`.
2026-04-22 01:05:26 +00:00
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
through to :func:`read_sas_preview` for text file dispatch.
"""
2026-04-22 01:05:26 +00:00
_tkw = text_kw or {}
preview_df, meta = read_sas_preview(path, **_tkw)
preview_df = apply_column_filter(preview_df, include, exclude)
total_rows = getattr(meta, "number_rows", None)
columns = infer_schema(
preview_df, meta,
total_rows=total_rows,
column_types=column_types,
force_nullable=force_nullable,
)
return columns, total_rows
2026-04-20 14:56:00 +00:00
def _discover_cluster_partitions(
cluster: ClusterSpec,
columns: Dict,
) -> dict:
"""Scan ALL files in ``cluster`` to discover partition values.
Returns a nested partition-value tree suitable for passing to
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
Each file is scanned chunk-by-chunk so the full dataset is never
materialized in memory.
"""
2026-04-22 01:05:26 +00:00
tkw = _build_text_kw(cluster)
2026-04-20 14:56:00 +00:00
merged: dict = {}
for path in cluster.files:
def _filtered_chunks(p=path):
2026-04-22 01:05:26 +00:00
for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw):
2026-04-20 14:56:00 +00:00
yield apply_column_filter(
chunk_df, cluster.include, cluster.exclude
)
file_tree = discover_partition_values_chunked(
_filtered_chunks(), cluster.partition_by, columns,
)
_merge_partition_trees(merged, file_tree)
return merged
def load_cluster(
conn,
cluster: ClusterSpec,
schemaname: str,
*,
workers: int = 1,
progress_queue: Any = None,
db_overrides: Optional[Dict[str, Optional[str]]] = None,
abort_on_first_failure: bool = False,
) -> int:
"""Load every file in ``cluster`` into one table. Returns total rows loaded.
2026-04-20 14:56:00 +00:00
When ``cluster.partition_by`` is non-empty, partition values are
discovered across ALL files before table creation so the full partition
tree exists before any data is copied.
2026-04-20 13:38:38 +00:00
Commits happen per chunk inside :func:`load_sas.copy_dataframes`. If a
file mid-cluster fails, earlier chunks - including chunks from earlier
files in the cluster - stay committed; only the in-flight chunk is
rolled back by :func:`main`.
``workers`` controls parallelism for streaming. With ``workers == 1``
every file streams on ``conn`` in sequence. With ``workers > 1`` the
main connection only does ``CREATE TABLE`` (and, for partitioned
clusters, partition discovery + pre-creation), commits, then dispatches
*every* file - including the first - to a ``ProcessPoolExecutor``. Each
worker opens its own psycopg2 connection, re-infers the per-file schema,
runs the same :func:`load_sas.assert_schema_compatible` check the serial
path uses, and streams chunks via COPY. Workers report per-chunk row
counts to ``progress_queue`` so the caller can drive a single aggregated
tqdm bar regardless of how many workers are in flight.
``db_overrides`` carries ``{"user", "password"}`` into workers when the
caller prompted for credentials interactively; leave ``None`` to let
workers read the standard libpq environment variables on their own.
"""
if not cluster.files:
return 0
2026-04-22 01:05:26 +00:00
tkw = _build_text_kw(cluster)
first, *rest = cluster.files
first_columns, first_total_rows = _infer_cluster_schema(
first, cluster.include, cluster.exclude,
column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
2026-04-22 01:05:26 +00:00
text_kw=tkw,
)
2026-04-20 14:56:00 +00:00
2026-04-20 15:18:09 +00:00
# -- Validate index columns early ---------------------------------------
if cluster.indexes:
missing_icols = [
c for c in cluster.indexes if c not in first_columns
]
if missing_icols:
raise ValueError(
f"cluster {cluster.tablename!r}: indexes references "
f"columns not present in the inferred schema: {missing_icols}"
)
2026-04-20 14:56:00 +00:00
# -- Partition support --------------------------------------------------
partition_values: Optional[dict] = None
if cluster.partition_by:
# Validate that all partition_by columns exist in the inferred schema.
missing_pcols = [
c for c in cluster.partition_by if c not in first_columns
]
if missing_pcols:
raise ValueError(
f"cluster {cluster.tablename!r}: partition_by references "
f"columns not present in the inferred schema: {missing_pcols}"
)
# Discover partition values across ALL files in the cluster.
# In append mode the partitions already exist, so skip the scan.
if cluster.if_exists == "append":
print(
" [info] append mode: skipping partition discovery "
"(partitions assumed to exist)",
file=sys.stderr,
)
else:
print(
f" discovering partition values across "
f"{len(cluster.files)} file(s)...",
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
cluster, first_columns,
)
total_parts = _count_partitions(partition_values)
print(
f" discovered {total_parts:,} partition table(s) "
f"across {len(cluster.partition_by)} level(s)",
file=sys.stderr,
)
create_table(
2026-04-20 14:56:00 +00:00
conn, schemaname, cluster.tablename, first_columns, cluster.if_exists,
partition_by=cluster.partition_by or None,
partition_values=partition_values,
max_partitions=cluster.max_partitions,
)
total = 0
if workers > 1:
# Parallel path: commit the (empty) table now so worker subprocesses'
# ``assert_schema_compatible`` probes can actually see it via
# ``information_schema``, then dispatch *every* file (first +
# rest) to the pool. The previous design streamed the first file
# on the main connection before spawning workers, which made the
# serial first-file phase the long pole on big-file clusters
# (e.g. 52 × 5-50 GB). Now ``CREATE TABLE`` is the only serial
# work and it takes milliseconds.
conn.commit()
total += _load_remaining_files_parallel(
cluster.files,
schemaname,
cluster.tablename,
cluster.include,
cluster.exclude,
workers=workers,
progress_queue=progress_queue,
db_overrides=db_overrides,
column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
abort_on_first_failure=abort_on_first_failure,
2026-04-22 01:05:26 +00:00
text_kw=tkw,
)
else:
# Serial path: stream the first file on the main connection, then
# iterate the rest. Worth keeping separate from the parallel path
# because spawning a single-worker pool just to load files in
# series would be pure overhead.
total += _stream_file(
conn, schemaname, cluster.tablename, first, first_columns,
cluster.include, cluster.exclude,
total_rows=first_total_rows,
progress_queue=progress_queue,
2026-04-22 01:05:26 +00:00
text_kw=tkw,
)
conn.commit()
for path in rest:
columns, path_total_rows = _infer_cluster_schema(
path, cluster.include, cluster.exclude,
column_types=cluster.column_types,
force_nullable=cluster.all_nullable,
2026-04-22 01:05:26 +00:00
text_kw=tkw,
)
# Uses the same check that if_exists=append runs. A type
# mismatch or missing column aborts the cluster; because
# chunks commit as they load, earlier chunks in the
# cluster remain in the table.
assert_schema_compatible(
conn, schemaname, cluster.tablename, columns
)
total += _stream_file(
conn, schemaname, cluster.tablename, path, columns,
cluster.include, cluster.exclude,
total_rows=path_total_rows,
progress_queue=progress_queue,
2026-04-22 01:05:26 +00:00
text_kw=tkw,
)
2026-04-20 15:18:09 +00:00
# -- Index support ------------------------------------------------------
if cluster.indexes:
create_indexes(conn, schemaname, cluster.tablename, cluster.indexes)
return total
def _stream_file(
conn,
schemaname: str,
tablename: str,
path: Path,
columns,
include,
exclude,
*,
total_rows: Optional[int] = None,
progress_queue: Any = None,
2026-04-22 01:05:26 +00:00
text_kw: Optional[Dict[str, Any]] = None,
) -> int:
"""Stream ``path`` into an existing table chunk by chunk.
When ``progress_queue`` is provided, each chunk's row count is published
to the queue as ``("rows", n)`` tuples instead of being rendered to a
per-file tqdm bar. That lets :func:`main` drive a single folder-wide
progress bar from a background drainer thread, which is the only way
to keep a coherent progress view when the folder loader is running
files in parallel workers.
2026-04-22 01:05:26 +00:00
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
through to :func:`iter_sas_chunks` for text file dispatch.
"""
2026-04-22 01:05:26 +00:00
_tkw = text_kw or {}
def _chunks():
if progress_queue is not None:
2026-04-22 01:05:26 +00:00
for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw):
chunk_df = apply_column_filter(chunk_df, include, exclude)
progress_queue.put(("rows", len(chunk_df)))
yield chunk_df
return
pbar = tqdm(
total=total_rows,
unit="row",
unit_scale=True,
desc=f" {path.name}",
file=sys.stderr,
dynamic_ncols=True,
)
try:
2026-04-22 01:05:26 +00:00
for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw):
chunk_df = apply_column_filter(chunk_df, include, exclude)
pbar.update(len(chunk_df))
yield chunk_df
finally:
pbar.close()
return copy_dataframes(conn, schemaname, tablename, _chunks(), columns)
# ---------------------------------------------------------------------------
# Parallel append workers
# ---------------------------------------------------------------------------
def _worker_load_append_file(
path_str: str,
schemaname: str,
tablename: str,
include: Optional[List[str]],
exclude: Optional[List[str]],
progress_queue: Any,
db_overrides: Optional[Dict[str, Optional[str]]],
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
2026-04-22 01:05:26 +00:00
text_kw: Optional[Dict[str, Any]] = None,
) -> Tuple[str, int, Optional[str]]:
2026-04-22 01:05:26 +00:00
"""Worker process: load one data file in append mode.
Runs in a subprocess spawned by :func:`_load_remaining_files_parallel`.
Opens its own psycopg2 connection, re-infers the per-file schema (so
per-file ``INTEGER`` vs ``BIGINT`` drift is caught by the existing
schema-compat check just like in the serial path), and streams chunks
via ``COPY``. Row counts are published to the shared queue for the
main process's global tqdm bar.
2026-04-22 01:05:26 +00:00
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
through to the reader functions for text file dispatch.
Returns ``(path_str, rows_loaded, error_or_None)`` - failures are
returned rather than raised so the parent can aggregate results
across workers without losing partial progress.
"""
from pathlib import Path as _Path
from dotenv import load_dotenv as _load_dotenv
import ctypes
import ctypes.util
import gc
from load_sas import (
apply_column_filter as _apply_column_filter,
assert_schema_compatible as _assert_schema_compatible,
connect as _connect,
copy_dataframes as _copy_dataframes,
infer_schema as _infer_schema,
iter_sas_chunks as _iter_sas_chunks,
read_sas_preview as _read_sas_preview,
)
_load_dotenv()
2026-04-22 01:05:26 +00:00
_tkw = text_kw or {}
path = _Path(path_str)
try:
2026-04-22 01:05:26 +00:00
preview_df, meta = _read_sas_preview(path, **_tkw)
preview_df = _apply_column_filter(preview_df, include, exclude)
total_rows = getattr(meta, "number_rows", None)
columns = _infer_schema(
preview_df, meta,
total_rows=total_rows,
column_types=column_types,
force_nullable=force_nullable,
)
# Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB
# and we never need it again after schema inference.
del preview_df, meta
user = db_overrides.get("user") if db_overrides else None
password = db_overrides.get("password") if db_overrides else None
conn = _connect(user=user, password=password)
conn.autocommit = False
try:
_assert_schema_compatible(conn, schemaname, tablename, columns)
def _chunks():
2026-04-22 01:05:26 +00:00
for chunk_df, _chunk_meta in _iter_sas_chunks(path, **_tkw):
chunk_df = _apply_column_filter(chunk_df, include, exclude)
if progress_queue is not None:
progress_queue.put(("rows", len(chunk_df)))
yield chunk_df
rows = _copy_dataframes(
conn, schemaname, tablename, _chunks(), columns
)
conn.commit()
return (path_str, rows, None)
finally:
conn.close()
except Exception as e:
import traceback as _traceback
tb = _traceback.format_exc()
# Keep the one-line summary (what the tqdm [FAIL] print uses) but
# tack on the full traceback so the final cluster-failure block
# shows the file/line that crashed. Without this, ``ProcessPool``
# workers lose every frame of context - you get "FloatingPointError:
# overflow encountered in multiply" with no hint of where inside
# the pandas/numpy/pyarrow stack it happened.
return (path_str, 0, f"{type(e).__name__}: {e}\n{tb}")
finally:
# Hand memory back to the OS before the worker is recycled (or before
# ``max_tasks_per_child`` rotates this process). Three layers, each
# of which independently retains memory across calls:
#
# 1. pyarrow's memory pool aggressively reuses buffers - explicitly
# release_unused() returns them to the allocator.
# 2. Python's GC: cyclic refs from pandas/pyarrow chains aren't
# collected until a generation tick; force one now.
# 3. glibc's ptmalloc keeps freed heap in per-thread arenas instead
# of munmap'ing it back. ``malloc_trim(0)`` is the explicit ask.
# No-op (silently) on platforms without the symbol (macOS, etc).
try:
import pyarrow as _pa
_pa.default_memory_pool().release_unused()
except Exception:
pass
gc.collect()
try:
_libc_name = ctypes.util.find_library("c")
if _libc_name:
_libc = ctypes.CDLL(_libc_name)
if hasattr(_libc, "malloc_trim"):
_libc.malloc_trim(0)
except Exception:
pass
def _load_remaining_files_parallel(
files: List[Path],
schemaname: str,
tablename: str,
include: Optional[List[str]],
exclude: Optional[List[str]],
*,
workers: int,
progress_queue: Any,
db_overrides: Optional[Dict[str, Optional[str]]],
column_types: Optional[Dict[str, str]] = None,
force_nullable: bool = False,
abort_on_first_failure: bool = False,
2026-04-22 01:05:26 +00:00
text_kw: Optional[Dict[str, Any]] = None,
) -> int:
"""Run append-mode loads for ``files`` across a process pool.
Each file is an independent unit of work submitted to
``ProcessPoolExecutor``. Workers infer schema, validate compatibility,
and stream via COPY just like the serial path. The table itself must
already exist (and be committed) before this is called - the worker
schema-compat probes read ``information_schema``, which won't see an
uncommitted ``CREATE TABLE``. Failures are surfaced two ways:
1. **Live stderr feed.** Every worker's outcome - success or failure
- is written to stderr the moment ``as_completed`` hands it back,
via ``tqdm.write`` so the active progress bar stays intact. This
turns the previous "wait 30 minutes for the last worker to drain
before you find out the first 31 failed at minute 2" footgun into
an immediate notification, which matters most when one bad file
(e.g. schema drift, OOM, bad data) torpedoes most of the pool.
2. **Final aggregate raise.** Errors are still collected and raised as
one ``RuntimeError`` after the pool drains so the per-cluster
success/fail summary in :func:`main` stays accurate. On
``KeyboardInterrupt`` we re-raise after wrapping the partial
error list into the same ``RuntimeError`` shape, so Ctrl-C still
prints what failed up to that point instead of silently
discarding it.
"""
total = 0
errors: List[Tuple[str, str]] = []
completed = 0
n_files = len(files)
# ``max_tasks_per_child=1`` recycles each worker process after every
# file. Without this, glibc/pyarrow/pyreadstat all retain peak-water
# memory inside long-lived workers; over a multi-hour run the sum
# across workers monotonically grows even though individual chunks
# have been freed at the Python level. Recycling per file gives the
# OS the memory back unconditionally - the only cost is one fork +
# python interpreter startup per file (~1-2 s), which is noise next
# to multi-GB sas7bdat reads.
pool_kwargs: Dict[str, Any] = {"max_workers": workers}
if sys.version_info >= (3, 11):
pool_kwargs["max_tasks_per_child"] = 1
with ProcessPoolExecutor(**pool_kwargs) as pool:
futures = [
pool.submit(
_worker_load_append_file,
str(p),
schemaname,
tablename,
include,
exclude,
progress_queue,
db_overrides,
column_types,
force_nullable,
2026-04-22 01:05:26 +00:00
text_kw,
)
for p in files
]
aborted = False
try:
for fut in as_completed(futures):
# ``--abort-on-first-failure`` cancels still-pending
# futures; ``as_completed`` yields them anyway and
# ``.result()`` raises ``CancelledError``. Skip those
# quietly - the abort log already accounted for the
# cancellation count.
try:
path_str, rows, err = fut.result()
except CancelledError:
continue
completed += 1
name = Path(path_str).name
if err is not None:
errors.append((path_str, err))
# ``tqdm.write`` writes through the bar without
# garbling it; bare ``print`` would interleave with
# the rendered progress line.
tqdm.write(
f"[FAIL {completed}/{n_files}] {name}: {err}",
file=sys.stderr,
)
if abort_on_first_failure and not aborted:
# Cancel anything still queued. Already-running
# workers can't be interrupted portably, so they
# keep going - we just stop dispatching new files
# and stop counting their results toward total.
# Set the flag once so we don't spam a cancel
# storm if multiple workers fail at the same time.
aborted = True
cancelled = 0
for f in futures:
if not f.done() and f.cancel():
cancelled += 1
tqdm.write(
f"[abort] --abort-on-first-failure: cancelled "
f"{cancelled} pending file(s); waiting for "
f"{n_files - completed - cancelled} in-flight "
f"worker(s) to finish.",
file=sys.stderr,
)
else:
tqdm.write(
f"[done {completed}/{n_files}] {name}: "
f"{rows:,} row(s)",
file=sys.stderr,
)
total += rows
except KeyboardInterrupt:
# Cancel anything still queued so we exit promptly. Already-
# running workers will run to completion (Python can't kill
# a child mid-syscall portably) but at least pending tasks
# don't keep firing. We re-raise as the same RuntimeError
# shape used below so the caller's per-cluster summary path
# still sees the partial failure list instead of an opaque
# KeyboardInterrupt with no context about which workers
# had already failed.
for f in futures:
if not f.done():
f.cancel()
tqdm.write(
f"[interrupt] Ctrl-C after {completed}/{n_files} file(s); "
f"{len(errors)} failure(s) collected so far.",
file=sys.stderr,
)
if errors:
joined = "\n".join(f" {p}: {e}" for p, e in errors)
raise RuntimeError(
f"interrupted after {len(errors)} worker(s) failed "
f"while appending to {schemaname}.{tablename}:\n{joined}"
)
raise
if errors:
joined = "\n".join(f" {p}: {e}" for p, e in errors)
raise RuntimeError(
f"{len(errors)} worker(s) failed while appending to "
f"{schemaname}.{tablename}:\n{joined}"
)
return total
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
2026-04-22 01:05:26 +00:00
"Load every data file (SAS or delimited text) in a folder into "
"Postgres, grouping files into clusters that each become one table."
),
)
p.add_argument("--config", required=True, type=Path, help="Path to YAML config")
p.add_argument(
"--dry-run",
action="store_true",
help=(
2026-04-20 14:56:00 +00:00
"Print discovered clusters and the inferred DDL for each "
"(CREATE TABLE plus partition DDL when applicable). For "
"partitioned clusters all files are scanned to discover "
"partition values. The database is never touched."
),
)
p.add_argument(
"--fail-fast",
action="store_true",
help=(
"Abort on the first cluster failure. Default is to roll that "
"cluster back and continue with the next one."
),
)
p.add_argument(
"--abort-on-first-failure",
action="store_true",
help=(
"Within a single cluster's parallel append, cancel every "
"still-pending worker the instant one worker fails. Use when "
"you know the failure is systemic (schema drift, bad creds, "
"OOM) and don't want to wait for the slow files to drain "
"before getting your prompt back. Already-running workers "
"still finish their current file - Python can't kill children "
"mid-syscall - but new files won't be dispatched. Orthogonal "
"to --fail-fast, which controls what happens between clusters."
),
)
2026-04-18 17:37:22 +00:00
p.add_argument(
"--dbcreds",
action="store_true",
help=(
"Prompt for database username and password instead of reading "
"PGUSER / PGPASSWORD from the environment or .env file."
),
)
p.add_argument(
"--chunk-rows",
type=int,
default=None,
metavar="N",
help=(
2026-04-22 01:05:26 +00:00
"Per-chunk row target for streaming and COPY. "
"Overrides both the GENERIC_LOADER_CHUNK_ROWS env var and the "
"auto-scaling applied when --workers > 1. Peak memory per "
2026-04-22 01:05:26 +00:00
"worker is roughly 4 × N × avg_row_bytes; with wide data "
"files (~4 KB/row) and 32 workers, N=100000 is a safe starting "
"point on a 128 GB box."
),
)
p.add_argument(
"--no-prescan",
action="store_true",
help=(
"Skip the per-file metadata scan that populates the folder-wide "
"tqdm ETA. Useful when the folder is large (half-hour+ pre-scan) "
"or when you're iterating quickly on a failure. Without the "
"pre-scan the progress bar still shows rows loaded, rate, and "
"elapsed time - it just can't estimate remaining time."
),
)
p.add_argument(
"--all-nullable",
action="store_true",
help=(
"Stamp every column nullable in the generated schema, bypassing "
"NOT NULL inference for every cluster. Use when sampled rows "
"wrongly suggest a column has no nulls and COPY fails mid-load "
"on the first null it hits. Overrides the per-cluster and "
"folder-level ``all_nullable`` YAML settings when set."
),
)
p.add_argument(
"--workers",
type=int,
default=1,
metavar="N",
help=(
"Number of worker processes for the append phase. With N=1 "
"(default) files load serially on the main connection. With "
"N>1 the first file of each cluster still runs serially (to "
"create the table), then the remaining files load in parallel "
"across N processes, each with its own psycopg2 connection. "
"On a big box try N close to your core count. When N>1 the "
"per-chunk row target drops to 500,000 unless you've pinned "
"GENERIC_LOADER_CHUNK_ROWS, so peak memory stays bounded."
),
)
return p
def _describe_cluster(cluster: ClusterSpec) -> str:
src = f"{cluster.source}"
if cluster.pattern:
src += f" pattern={cluster.pattern!r}"
files = ", ".join(f.name for f in cluster.files) or "(no matching files)"
2026-04-20 14:56:00 +00:00
parts = ""
if cluster.partition_by:
parts = f"\n partition_by: {cluster.partition_by}"
2026-04-20 15:18:09 +00:00
idx = ""
if cluster.indexes:
idx = f"\n indexes: {cluster.indexes}"
return (
f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n"
2026-04-20 15:18:09 +00:00
f" files: {files}{parts}{idx}"
)
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
load_dotenv()
cfg = load_folder_config(args.config)
if not cfg.folder.exists() or not cfg.folder.is_dir():
print(f"error: folder not found: {cfg.folder}", file=sys.stderr)
return 2
clusters = discover_clusters(cfg)
# CLI override: --all-nullable trumps both folder-level and per-cluster
# YAML ``all_nullable`` settings. Applied here (before any schema work)
# so every downstream path - dry-run, pre-scan, worker dispatch - sees
# the same flag on the ClusterSpec.
if args.all_nullable:
for c in clusters:
c.all_nullable = True
print(
"[info] --all-nullable set: stamping every column nullable "
"across all clusters (NOT NULL inference disabled).",
file=sys.stderr,
)
loadable = [c for c in clusters if c.files]
if not loadable:
2026-04-22 01:05:26 +00:00
if cfg.file_type == "text":
ext_label = ", ".join(TEXT_EXTENSIONS)
kind = "text"
else:
ext_label = ", ".join(SAS_EXTENSIONS)
kind = "SAS"
print(
2026-04-22 01:05:26 +00:00
f"error: no {kind} files found in {cfg.folder} "
f"(looked for {ext_label})",
file=sys.stderr,
)
return 2
print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:")
for c in clusters:
print(_describe_cluster(c))
if args.dry_run:
print()
for c in loadable:
2026-04-20 14:56:00 +00:00
print(f"--- DDL for cluster {c.tablename!r} ---")
# Dry-run skips the pre-scan (so no auto-union) but user-supplied
# ``column_types`` from YAML are already baked into ``c.column_types``
# by ``discover_clusters`` - honor them here so the previewed DDL
# matches what a real load would produce on a single-file cluster.
2026-04-22 01:05:26 +00:00
_dry_tkw = _build_text_kw(c)
columns, _ = _infer_cluster_schema(
c.files[0], c.include, c.exclude,
column_types=c.column_types,
force_nullable=c.all_nullable,
2026-04-22 01:05:26 +00:00
text_kw=_dry_tkw,
)
2026-04-20 14:56:00 +00:00
# Print parent CREATE TABLE (with PARTITION BY if applicable).
print(
render_create_table(
cfg.schemaname, c.tablename, columns,
partition_by=c.partition_by or None,
)
)
# Print child partition DDL when the cluster is partitioned.
if c.partition_by:
# Validate partition columns exist in the schema.
missing_pcols = [
col for col in c.partition_by if col not in columns
]
if missing_pcols:
print(
f" [error] partition_by references columns not in "
f"schema: {missing_pcols}",
file=sys.stderr,
)
else:
print(
f" discovering partition values across "
f"{len(c.files)} file(s)...",
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
c, columns,
)
total_parts = _count_partitions(partition_values)
print(
f" discovered {total_parts:,} partition table(s) "
f"across {len(c.partition_by)} level(s)",
file=sys.stderr,
)
child_stmts = render_partition_ddl(
cfg.schemaname, c.tablename, c.partition_by,
partition_values, columns,
max_partitions=c.max_partitions,
)
for stmt in child_stmts:
print()
print(stmt)
2026-04-20 15:18:09 +00:00
# Print CREATE INDEX DDL when the cluster has indexes.
if c.indexes:
missing_icols = [
col for col in c.indexes if col not in columns
]
if missing_icols:
print(
f" [error] indexes references columns not in "
f"schema: {missing_icols}",
file=sys.stderr,
)
else:
idx_stmts = render_create_indexes(
cfg.schemaname, c.tablename, c.indexes,
)
for stmt in idx_stmts:
print()
print(stmt)
print()
return 0
2026-04-18 17:37:22 +00:00
db_user = db_password = None
if args.dbcreds:
db_user = input("Database username: ")
db_password = getpass.getpass("Database password: ")
db_overrides: Optional[Dict[str, Optional[str]]] = (
{"user": db_user, "password": db_password} if args.dbcreds else None
)
workers = max(1, int(args.workers))
# Per-worker peak memory ~= chunk_rows × avg_row_bytes × ~4 (the original
# pyreadstat DataFrame, the type-coerced ``prepared`` copy, the pyarrow
# table, and the serialized CSV buffer can all be alive simultaneously).
# With 32 workers and 500k rows × wide sas7bdat that's easily >128 GB -
# the default the loader shipped with OOM'd on a c6i.32xlarge box. Scale
# the auto target inversely with worker count so total memory stays
# roughly flat regardless of how many workers you pick. Floor of 50k
# keeps per-chunk overhead amortized; ceiling of 500k is where pyarrow
# / pyreadstat buffer spikes start to dominate.
#
# Order of precedence (most wins):
# 1. ``--chunk-rows N`` CLI flag (if provided)
# 2. ``GENERIC_LOADER_CHUNK_ROWS`` env var (if already set)
# 3. Auto-pick based on ``workers``
if args.chunk_rows is not None:
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(int(args.chunk_rows))
print(
f"[info] --chunk-rows {args.chunk_rows:,}: pinning per-chunk "
f"row target (overrides auto-scaling).",
file=sys.stderr,
)
elif "GENERIC_LOADER_CHUNK_ROWS" in os.environ:
print(
f"[info] honoring GENERIC_LOADER_CHUNK_ROWS="
f"{os.environ['GENERIC_LOADER_CHUNK_ROWS']} from environment.",
file=sys.stderr,
)
elif workers > 1:
auto_rows = max(50_000, min(500_000, 3_200_000 // workers))
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(auto_rows)
print(
f"[info] parallel mode (workers={workers}): auto-scaled "
f"per-chunk rows to {auto_rows:,}. "
f"Use --chunk-rows N to override if you have RAM headroom.",
file=sys.stderr,
)
# -- Metadata pre-scan -----------------------------------------------------
# Sum ``number_rows`` across every file so the tqdm bar has a real
# denominator, AND collect the per-column (readstat_type, sas_format)
# tuples so we can union schemas across files in a cluster before any
# CREATE TABLE runs. ``read_sas_metadata`` uses pyreadstat's
# ``metadataonly=True`` fast path, but on multi-GB sas7bdat files
# that still reads tens of MB of scattered subheader pages per file -
# sequentially that's minutes for a 52-file folder. pyreadstat
# releases the GIL during I/O and C decoding, so a ThreadPool gives
# near-linear scaling until the disk saturates. ``--no-prescan``
# bypasses the scan entirely; the progress bar then runs without an
# ETA *and* the auto-union is skipped (user overrides from YAML
# still apply).
all_files: List[Path] = [p for c in loadable for p in c.files]
grand_total: Optional[int] = 0
file_meta_by_path: Dict[str, Dict[str, Tuple[str, Optional[str]]]] = {}
if args.no_prescan:
grand_total = None
print(
f"[info] --no-prescan set: skipping row-count pre-scan for "
f"{len(all_files)} file(s); progress bar will show rate + "
f"elapsed but no ETA. Cluster-wide schema auto-union is also "
f"disabled; only user-specified column_types overrides apply.",
file=sys.stderr,
)
else:
prescan_workers = min(16, max(1, len(all_files)))
print(
f"pre-scanning row counts + per-column metadata for "
f"{len(all_files)} file(s) across {prescan_workers} thread(s)...",
file=sys.stderr,
)
def _scan_one(
p: Path,
) -> Tuple[
Path,
Optional[int],
Optional[Dict[str, Tuple[str, Optional[str]]]],
Optional[str],
]:
try:
2026-04-22 01:05:26 +00:00
_prescan_tkw = dict(
delimiter=cfg.delimiter,
text_encoding=cfg.text_encoding,
quotechar=cfg.quotechar,
)
meta = read_sas_metadata(p, **_prescan_tkw)
n = getattr(meta, "number_rows", None)
col_meta = extract_union_metadata(meta)
return (
p,
int(n) if n is not None else None,
col_meta,
None,
)
except Exception as e:
return (p, None, None, str(e))
unknown_total_files: List[str] = []
running_total = 0
with ThreadPoolExecutor(max_workers=prescan_workers) as tpool:
prescan_bar = tqdm(
total=len(all_files),
unit="file",
desc=" prescanning",
file=sys.stderr,
dynamic_ncols=True,
)
try:
for p, n, col_meta, err in tpool.map(_scan_one, all_files):
prescan_bar.update(1)
if err is not None:
unknown_total_files.append(f"{p.name} ({err})")
elif n is None:
unknown_total_files.append(p.name)
else:
running_total += n
if col_meta is not None:
file_meta_by_path[str(p)] = col_meta
finally:
prescan_bar.close()
if unknown_total_files:
print(
f"[warn] could not read row count from "
f"{len(unknown_total_files)} file(s); progress bar ETA will "
f"be approximate.",
file=sys.stderr,
)
print(
f" total rows across folder: {running_total:,}",
file=sys.stderr,
)
grand_total = running_total
# -- Cluster-wide schema auto-union ---------------------------------------
# For each cluster, compute ``auto_types`` from the union of every
# file's metadata (see :func:`load_sas.union_column_types`). Merge with
# any user-supplied YAML overrides (user wins) and attach the result
# back onto the cluster so every later read - first-file inference,
# worker inference, schema-compat check - sees the same frozen schema.
# With ``--no-prescan`` the file_meta_by_path dict is empty and
# ``auto_types`` resolves to {}, so only the YAML overrides survive.
for c in loadable:
per_file = [
file_meta_by_path[str(p)]
for p in c.files
if str(p) in file_meta_by_path
]
auto_types = union_column_types(per_file) if per_file else {}
user_overrides = dict(c.column_types) # already merged folder+cluster
# User-supplied overrides win over the auto-union.
merged = {**auto_types, **user_overrides}
c.column_types = merged
if auto_types:
# Only call out columns where auto-union *changed* something
# relative to the default "first file wins" inference. We
# don't have the default inference in hand at this point, so
# log the full resolved map at a debug-friendly level - it's
# bounded by column count and the user asked for visibility
# into what got overridden.
shown = auto_types
if user_overrides:
# Distinguish the user-forced entries in the log so it's
# obvious which types came from YAML.
shown = {
col: (
f"{user_overrides[col]} (user override)"
if col in user_overrides
else pg
)
for col, pg in merged.items()
}
print(
f"[info] cluster {c.tablename!r}: auto-union derived "
f"{len(auto_types)} column type(s) across "
f"{len(per_file)} file(s): {shown}",
file=sys.stderr,
)
elif user_overrides and args.no_prescan:
print(
f"[info] cluster {c.tablename!r}: using {len(user_overrides)} "
f"user-supplied column_types override(s); auto-union "
f"disabled by --no-prescan.",
file=sys.stderr,
)
# -- Shared progress plumbing ---------------------------------------------
# The queue crosses process boundaries when workers > 1 (managed proxy)
# and is a plain in-process queue otherwise; the put/get contract is
# identical either way. A daemon thread drains it and advances the one
# tqdm bar that spans the whole folder load.
manager: Optional[Any] = None
progress_queue: Any
if workers > 1:
manager = mp.Manager()
progress_queue = manager.Queue()
else:
progress_queue = _queue_mod.Queue()
pbar = tqdm(
total=grand_total or None,
unit="row",
unit_scale=True,
desc=f"{cfg.folder.name}",
file=sys.stderr,
dynamic_ncols=True,
)
stop_drainer = threading.Event()
def _drainer() -> None:
while not stop_drainer.is_set():
try:
event = progress_queue.get(timeout=0.1)
except _queue_mod.Empty:
continue
except (EOFError, OSError):
return
if not event:
continue
kind = event[0]
if kind == "rows":
pbar.update(event[1])
drainer_thread = threading.Thread(target=_drainer, daemon=True)
drainer_thread.start()
2026-04-18 17:37:22 +00:00
conn = connect(user=db_user, password=db_password)
conn.autocommit = False
failures: List[Tuple[str, Exception]] = []
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
try:
for cluster in loadable:
print(
f"\n>>> loading cluster {cluster.tablename!r} "
f"({len(cluster.files)} file(s)) "
f"[workers={workers}]"
)
try:
rows = load_cluster(
conn,
cluster,
cfg.schemaname,
workers=workers,
progress_queue=progress_queue,
db_overrides=db_overrides,
abort_on_first_failure=args.abort_on_first_failure,
)
conn.commit()
totals.append((cluster.tablename, len(cluster.files), rows))
print(
f" -> loaded {rows:,} row(s) into "
f"{cfg.schemaname}.{cluster.tablename}"
)
except Exception as e:
conn.rollback()
failures.append((cluster.tablename, e))
print(
f" !! cluster {cluster.tablename!r} failed: {e}",
file=sys.stderr,
)
if args.fail_fast:
break
finally:
# Drain any pending progress events before shutting the bar down so
# the final rendered total matches what actually landed.
stop_drainer.set()
drainer_thread.join(timeout=2.0)
pbar.close()
conn.close()
if manager is not None:
manager.shutdown()
print("\n=== summary ===")
for name, fcount, rows in totals:
print(f" ok {name}: {fcount} file(s), {rows:,} row(s)")
for name, err in failures:
print(f" FAIL {name}: {err}", file=sys.stderr)
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())