2010 lines
77 KiB
Python
2010 lines
77 KiB
Python
"""Folder-level data-to-Postgres loader.
|
||
|
||
Wraps :mod:`load_sas` so an entire directory of data files (SAS or delimited
|
||
text) can be ingested in one invocation. A directory often contains several
|
||
*clusters* of files that share a schema (e.g. ``group_a1.sas7bdat``,
|
||
``group_a2.sas7bdat``, ... or ``group_a1.csv``, ``group_a2.csv``, ...). Each
|
||
cluster becomes one Postgres table; files inside a cluster are appended to it.
|
||
|
||
-------------------------------------------------------------------------------
|
||
USAGE
|
||
-------------------------------------------------------------------------------
|
||
|
||
1. YAML config
|
||
--------------
|
||
::
|
||
|
||
folder: samples/folder_test # required; relative paths resolve against
|
||
# the config file's directory
|
||
schemaname: public # required
|
||
|
||
# Optional. One of: fail | replace | append. Default: fail.
|
||
# Applied to the first file of each cluster (subsequent files in the
|
||
# cluster always run through the append-mode compatibility check).
|
||
if_exists: fail
|
||
|
||
# Optional. Default: true. When true, files that don't match any explicit
|
||
# pattern below are grouped by their common prefix (trailing digits, and
|
||
# optional trailing separators, are stripped from each file stem).
|
||
auto_detect: true
|
||
|
||
# Optional. Columns to force-include or force-exclude across every file.
|
||
# include and exclude are mutually exclusive.
|
||
# include: [ID, INTCOL]
|
||
# exclude: [ALLNULL]
|
||
|
||
# Optional folder default for explicit column type overrides. These
|
||
# win over the cluster-wide auto-union computed during pre-scan; set
|
||
# them when a column's SAS-level type varies across files (e.g. phone
|
||
# IDs stored as CHAR in some years and NUM in others) and you want to
|
||
# pin the Postgres type yourself rather than accept the auto-derived
|
||
# one. Per-cluster column_types inside each clusters[*] entry are
|
||
# merged on top of this map.
|
||
# column_types:
|
||
# RESP_PH_PREFIX_ID: TEXT
|
||
# SOME_BIGINT_COL: BIGINT
|
||
|
||
# Optional folder default for LIST partitioning. Omit or set [] for no
|
||
# partitioning. Accepts a single string or a list of column names.
|
||
# partition_by:
|
||
# - state
|
||
# - zip
|
||
|
||
# Optional folder default threshold. Default: 10000.
|
||
# max_partitions: 10000
|
||
|
||
# Optional explicit cluster patterns. Each pattern is matched against the
|
||
# file *basename*. Matched files are pulled out of the auto-detect pool.
|
||
# Per-cluster if_exists/include/exclude/partition_by/max_partitions/
|
||
# column_types override the folder-level defaults.
|
||
clusters:
|
||
- pattern: '^group_a\\d+\\.sas7bdat$'
|
||
tablename: group_a
|
||
- pattern: '^group_b\\d+\\.sas7bdat$'
|
||
tablename: group_b
|
||
if_exists: replace
|
||
column_types:
|
||
PHONE_PREFIX: TEXT
|
||
|
||
2. Command-line interface
|
||
-------------------------
|
||
::
|
||
|
||
python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast]
|
||
[--dbcreds]
|
||
|
||
Flags:
|
||
--config PATH Required. Path to the YAML config above.
|
||
--dry-run Print the discovered clusters and the inferred DDL for
|
||
each (CREATE TABLE plus partition DDL when applicable).
|
||
For partitioned clusters all files are scanned to
|
||
discover partition values. The database is never touched.
|
||
--fail-fast Abort the whole run on the first cluster failure.
|
||
Default is to log the failure, roll that cluster back,
|
||
and keep going.
|
||
--dbcreds Prompt interactively for the database username and
|
||
password instead of reading ``PGUSER`` / ``PGPASSWORD``
|
||
from the environment or ``.env`` file. The password
|
||
prompt does not echo. Has no effect with ``--dry-run``
|
||
(no connection is opened).
|
||
|
||
Exit codes:
|
||
0 - every cluster loaded successfully (or dry-run completed)
|
||
1 - at least one cluster failed (details on stderr)
|
||
2 - folder does not exist / contains no data files
|
||
|
||
3. Discovery rules
|
||
------------------
|
||
* Supported SAS extensions: ``.sas7bdat``, ``.xpt``, ``.xport``.
|
||
Supported text extensions: ``.txt``, ``.csv``, ``.tsv``.
|
||
The ``file_type`` config key controls which set is used.
|
||
The folder is not scanned recursively.
|
||
* Explicit patterns are tried in order. A file matched by one pattern is
|
||
removed from the pool before the next pattern runs, so earlier patterns
|
||
win in case of overlap. Overlap between patterns is flagged as an error
|
||
at config-parse time (a file matching two patterns is almost always a bug).
|
||
* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with
|
||
any trailing ``_`` / ``-`` stripped afterward. Stems without trailing
|
||
digits become singleton clusters named after the stem.
|
||
* Within a cluster, files are sorted **numerically** by the last digit
|
||
group in the stem, so ``..._9_...`` comes before ``..._10_...`` /
|
||
``..._40_...`` regardless of zero-padding. The first file in that
|
||
order drives schema inference; the rest are checked against that
|
||
schema via :func:`load_sas.assert_schema_compatible`. Gaps in the
|
||
numeric sequence (missing ``3``, ``7``, ``14``, ...) are irrelevant -
|
||
whatever files are present get loaded in numeric order.
|
||
* Auto-detect only recognizes *trailing* digit runs. File names where
|
||
the varying number sits in the middle of the stem (surrounded by
|
||
other name components) are not grouped by auto-detect - each becomes
|
||
its own singleton cluster. Use an explicit pattern to bucket them::
|
||
|
||
clusters:
|
||
- pattern: '^year2020_regionA_\\d+_detail\\.sas7bdat$'
|
||
tablename: year2020_regionA_detail
|
||
|
||
The regex still matches any digit width, so numbers like ``9`` and
|
||
``40`` both land in the same cluster and the numeric sort above puts
|
||
``9`` before ``40``.
|
||
|
||
4. Library usage
|
||
----------------
|
||
::
|
||
|
||
from load_folder import load_folder_config, discover_clusters, load_cluster
|
||
from load_sas import connect
|
||
|
||
cfg = load_folder_config("folder_config.yaml")
|
||
clusters = discover_clusters(cfg)
|
||
|
||
conn = connect()
|
||
try:
|
||
for cluster in clusters:
|
||
load_cluster(conn, cluster, cfg.schemaname)
|
||
finally:
|
||
conn.close()
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import getpass
|
||
import multiprocessing as mp
|
||
import os
|
||
import queue as _queue_mod
|
||
import re
|
||
import sys
|
||
import threading
|
||
from concurrent.futures import (
|
||
CancelledError,
|
||
ProcessPoolExecutor,
|
||
ThreadPoolExecutor,
|
||
as_completed,
|
||
)
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import yaml
|
||
from dotenv import load_dotenv
|
||
from tqdm import tqdm
|
||
|
||
from load_sas import (
|
||
TEXT_EXTENSIONS,
|
||
VALID_FILE_TYPES,
|
||
VALID_IF_EXISTS,
|
||
_count_partitions,
|
||
_is_text_file,
|
||
_merge_partition_trees,
|
||
apply_column_filter,
|
||
assert_schema_compatible,
|
||
connect,
|
||
copy_dataframes,
|
||
create_indexes,
|
||
create_table,
|
||
discover_partition_values_chunked,
|
||
extract_union_metadata,
|
||
infer_schema,
|
||
iter_sas_chunks,
|
||
read_sas_metadata,
|
||
read_sas_preview,
|
||
render_create_indexes,
|
||
render_create_table,
|
||
render_partition_ddl,
|
||
union_column_types,
|
||
)
|
||
|
||
|
||
SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport")
|
||
SUPPORTED_EXTENSIONS = SAS_EXTENSIONS + TEXT_EXTENSIONS
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dataclasses
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class ClusterSpec:
|
||
"""Resolved per-cluster load settings.
|
||
|
||
``partition_by``, ``max_partitions``, and ``indexes`` are resolved from
|
||
the folder defaults and any per-cluster overrides during
|
||
:func:`discover_clusters`. ``column_types`` holds the effective type
|
||
overrides for this cluster: user-supplied YAML entries merged on top
|
||
of the auto-union result computed during pre-scan (see :func:`main`).
|
||
The same dict is threaded through to workers so every file in the
|
||
cluster infers the same schema.
|
||
|
||
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
|
||
``quotechar``) is propagated from the folder config during
|
||
:func:`discover_clusters` so that reader functions receive the correct
|
||
parameters for delimited text files.
|
||
"""
|
||
|
||
tablename: str
|
||
files: List[Path]
|
||
if_exists: str
|
||
include: Optional[List[str]]
|
||
exclude: Optional[List[str]]
|
||
source: str # "explicit" or "auto"
|
||
pattern: Optional[str] = None
|
||
partition_by: List[str] = field(default_factory=list)
|
||
max_partitions: int = 10_000
|
||
indexes: List[str] = field(default_factory=list)
|
||
column_types: Dict[str, str] = field(default_factory=dict)
|
||
all_nullable: bool = False
|
||
file_type: str = "sas"
|
||
delimiter: str = ","
|
||
text_encoding: str = "utf-8"
|
||
quotechar: str = '"'
|
||
|
||
|
||
@dataclass
|
||
class _ExplicitPattern:
|
||
"""Parsed form of a single ``clusters[*]`` YAML entry.
|
||
|
||
``partition_by`` defaults to ``None`` meaning "inherit from folder level".
|
||
An explicit empty list ``[]`` means "disable partitioning for this cluster".
|
||
``max_partitions`` defaults to ``None`` meaning "inherit from folder level".
|
||
``indexes`` defaults to ``None`` meaning "inherit from folder level".
|
||
``column_types`` defaults to ``None`` meaning "inherit from folder level";
|
||
an explicit ``{}`` means "no user overrides for this cluster".
|
||
"""
|
||
|
||
pattern: re.Pattern
|
||
raw_pattern: str
|
||
tablename: str
|
||
if_exists: Optional[str] = None
|
||
include: Optional[List[str]] = None
|
||
exclude: Optional[List[str]] = None
|
||
partition_by: Optional[List[str]] = None
|
||
max_partitions: Optional[int] = None
|
||
indexes: Optional[List[str]] = None
|
||
column_types: Optional[Dict[str, str]] = None
|
||
all_nullable: Optional[bool] = None
|
||
|
||
|
||
@dataclass
|
||
class FolderConfig:
|
||
"""Folder-level configuration parsed from YAML.
|
||
|
||
``partition_by``, ``max_partitions``, and ``indexes`` serve as defaults
|
||
for every cluster unless overridden at the cluster level.
|
||
``column_types`` is a ``{column_name: postgres_type_str}`` map of
|
||
user-supplied type overrides that win over the auto-union computed
|
||
during pre-scan.
|
||
|
||
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
|
||
``quotechar``) controls how delimited text files are discovered and
|
||
read. These fields are ignored when ``file_type`` is ``"sas"``
|
||
(the default).
|
||
"""
|
||
|
||
folder: Path
|
||
schemaname: str
|
||
if_exists: str = "fail"
|
||
auto_detect: bool = True
|
||
include: Optional[List[str]] = None
|
||
exclude: Optional[List[str]] = None
|
||
explicit: List[_ExplicitPattern] = field(default_factory=list)
|
||
partition_by: List[str] = field(default_factory=list)
|
||
max_partitions: int = 10_000
|
||
indexes: List[str] = field(default_factory=list)
|
||
column_types: Dict[str, str] = field(default_factory=dict)
|
||
all_nullable: bool = False
|
||
file_type: str = "sas"
|
||
delimiter: str = ","
|
||
text_encoding: str = "utf-8"
|
||
quotechar: str = '"'
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config loading
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _validate_if_exists(value: Any, where: str) -> str:
|
||
s = str(value).lower()
|
||
if s not in VALID_IF_EXISTS:
|
||
raise ValueError(
|
||
f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}"
|
||
)
|
||
return s
|
||
|
||
|
||
def _parse_columns_filter(
|
||
raw: Dict[str, Any], where: str
|
||
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
|
||
include = raw.get("include")
|
||
exclude = raw.get("exclude")
|
||
if include is not None and exclude is not None:
|
||
raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.")
|
||
if include is not None and not isinstance(include, list):
|
||
raise ValueError(f"{where}: 'include' must be a list of column names.")
|
||
if exclude is not None and not isinstance(exclude, list):
|
||
raise ValueError(f"{where}: 'exclude' must be a list of column names.")
|
||
include_out = [str(c) for c in include] if include is not None else None
|
||
exclude_out = [str(c) for c in exclude] if exclude is not None else None
|
||
return include_out, exclude_out
|
||
|
||
|
||
def _parse_partition_by(
|
||
raw_value: Any, where: str, *, allow_none: bool = False
|
||
) -> Optional[List[str]]:
|
||
"""Parse a ``partition_by`` value from YAML.
|
||
|
||
Returns a list of non-empty, unique column name strings. When
|
||
``allow_none`` is True (used for per-cluster entries), an omitted key
|
||
returns ``None`` to signal "inherit from folder level". An explicit
|
||
empty list ``[]`` always returns ``[]``.
|
||
"""
|
||
if raw_value is None:
|
||
return None if allow_none else []
|
||
if isinstance(raw_value, str):
|
||
if not raw_value.strip():
|
||
raise ValueError(f"{where}: 'partition_by' string must be non-empty.")
|
||
return [raw_value.strip()]
|
||
if isinstance(raw_value, list):
|
||
if len(raw_value) == 0:
|
||
return []
|
||
result: List[str] = []
|
||
for i, item in enumerate(raw_value):
|
||
if not isinstance(item, str) or not item.strip():
|
||
raise ValueError(
|
||
f"{where}: 'partition_by[{i}]' must be a non-empty string."
|
||
)
|
||
result.append(str(item).strip())
|
||
if len(result) != len(set(result)):
|
||
raise ValueError(
|
||
f"{where}: 'partition_by' contains duplicate column names."
|
||
)
|
||
return result
|
||
raise ValueError(
|
||
f"{where}: 'partition_by' must be a string or list of strings."
|
||
)
|
||
|
||
|
||
def _parse_max_partitions(
|
||
raw_value: Any, where: str, *, allow_none: bool = False
|
||
) -> Optional[int]:
|
||
"""Parse a ``max_partitions`` value from YAML.
|
||
|
||
Returns a positive integer. When ``allow_none`` is True (used for
|
||
per-cluster entries), an omitted key returns ``None`` to signal
|
||
"inherit from folder level".
|
||
"""
|
||
if raw_value is None:
|
||
return None if allow_none else 10_000
|
||
try:
|
||
value = int(raw_value)
|
||
except (TypeError, ValueError):
|
||
raise ValueError(
|
||
f"{where}: 'max_partitions' must be a positive integer, "
|
||
f"got {raw_value!r}"
|
||
)
|
||
if value <= 0:
|
||
raise ValueError(
|
||
f"{where}: 'max_partitions' must be a positive integer, "
|
||
f"got {value}"
|
||
)
|
||
return value
|
||
|
||
|
||
def _validate_partition_vs_columns(
|
||
partition_by: List[str],
|
||
exclude: Optional[List[str]],
|
||
where: str,
|
||
) -> None:
|
||
"""Raise if any ``partition_by`` column is in the ``exclude`` list."""
|
||
if not partition_by or exclude is None:
|
||
return
|
||
excluded_parts = [c for c in partition_by if c in exclude]
|
||
if excluded_parts:
|
||
raise ValueError(
|
||
f"{where}: 'exclude' removes partition_by columns: {excluded_parts}"
|
||
)
|
||
|
||
|
||
def _parse_indexes(
|
||
raw_value: Any, where: str, *, allow_none: bool = False
|
||
) -> Optional[List[str]]:
|
||
"""Parse an ``indexes`` value from YAML.
|
||
|
||
Returns a list of non-empty, unique column name strings. When
|
||
``allow_none`` is True (used for per-cluster entries), an omitted key
|
||
returns ``None`` to signal "inherit from folder level". An explicit
|
||
empty list ``[]`` always returns ``[]``.
|
||
"""
|
||
if raw_value is None:
|
||
return None if allow_none else []
|
||
if isinstance(raw_value, str):
|
||
if not raw_value.strip():
|
||
raise ValueError(f"{where}: 'indexes' string must be non-empty.")
|
||
return [raw_value.strip()]
|
||
if isinstance(raw_value, list):
|
||
if len(raw_value) == 0:
|
||
return []
|
||
result: List[str] = []
|
||
for i, item in enumerate(raw_value):
|
||
if not isinstance(item, str) or not item.strip():
|
||
raise ValueError(
|
||
f"{where}: 'indexes[{i}]' must be a non-empty string."
|
||
)
|
||
result.append(str(item).strip())
|
||
if len(result) != len(set(result)):
|
||
raise ValueError(
|
||
f"{where}: 'indexes' contains duplicate column names."
|
||
)
|
||
return result
|
||
raise ValueError(
|
||
f"{where}: 'indexes' must be a string or list of strings."
|
||
)
|
||
|
||
|
||
def _validate_indexes_vs_columns(
|
||
indexes: List[str],
|
||
exclude: Optional[List[str]],
|
||
where: str,
|
||
) -> None:
|
||
"""Raise if any ``indexes`` column is in the ``exclude`` list."""
|
||
if not indexes or exclude is None:
|
||
return
|
||
excluded_idx = [c for c in indexes if c in exclude]
|
||
if excluded_idx:
|
||
raise ValueError(
|
||
f"{where}: 'exclude' removes index columns: {excluded_idx}"
|
||
)
|
||
|
||
|
||
def _parse_column_types(
|
||
raw_value: Any, where: str, *, allow_none: bool = False
|
||
) -> Optional[Dict[str, str]]:
|
||
"""Parse a ``column_types`` mapping from YAML.
|
||
|
||
The value must be a mapping ``{column_name: pg_type_str}``. Keys and
|
||
values are whitespace-stripped strings; empty strings raise. When
|
||
``allow_none`` is True (used for per-cluster entries), an omitted key
|
||
returns ``None`` to mean "inherit from folder level"; an explicit
|
||
empty mapping returns ``{}`` (no overrides for this cluster).
|
||
"""
|
||
if raw_value is None:
|
||
return None if allow_none else {}
|
||
if not isinstance(raw_value, dict):
|
||
raise ValueError(
|
||
f"{where}: 'column_types' must be a mapping of "
|
||
f"{{column_name: postgres_type}}."
|
||
)
|
||
out: Dict[str, str] = {}
|
||
for k, v in raw_value.items():
|
||
key = str(k).strip()
|
||
if not key:
|
||
raise ValueError(
|
||
f"{where}: 'column_types' contains an empty column name."
|
||
)
|
||
if not isinstance(v, str) or not v.strip():
|
||
raise ValueError(
|
||
f"{where}: 'column_types[{key}]' must be a non-empty "
|
||
f"Postgres type string (got {v!r})."
|
||
)
|
||
out[key] = v.strip()
|
||
return out
|
||
|
||
|
||
def load_folder_config(path: Path) -> FolderConfig:
|
||
"""Parse and validate the folder-level YAML config at ``path``.
|
||
|
||
Supports optional ``partition_by`` and ``max_partitions`` at both the
|
||
folder level (defaults for all clusters) and per explicit cluster entry
|
||
(overrides the folder default).
|
||
"""
|
||
path = Path(path)
|
||
with path.open("r", encoding="utf-8") as f:
|
||
raw = yaml.safe_load(f)
|
||
|
||
if not isinstance(raw, dict):
|
||
raise ValueError(f"Config at {path} must be a YAML mapping at the top level.")
|
||
|
||
missing = [k for k in ("folder", "schemaname") if k not in raw]
|
||
if missing:
|
||
raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}")
|
||
|
||
folder = Path(raw["folder"])
|
||
if not folder.is_absolute():
|
||
candidate = (path.parent / folder).resolve()
|
||
folder = candidate if candidate.exists() else folder
|
||
|
||
schemaname = str(raw["schemaname"])
|
||
if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}")
|
||
auto_detect = bool(raw.get("auto_detect", True))
|
||
|
||
include, exclude = _parse_columns_filter(raw, f"Config {path}")
|
||
|
||
# -- folder-level partition settings ------------------------------------
|
||
partition_by = _parse_partition_by(
|
||
raw.get("partition_by"), f"Config {path}"
|
||
)
|
||
max_partitions = _parse_max_partitions(
|
||
raw.get("max_partitions"), f"Config {path}"
|
||
)
|
||
_validate_partition_vs_columns(partition_by, exclude, f"Config {path}")
|
||
|
||
# -- folder-level index settings ----------------------------------------
|
||
indexes = _parse_indexes(raw.get("indexes"), f"Config {path}")
|
||
_validate_indexes_vs_columns(indexes, exclude, f"Config {path}")
|
||
|
||
# -- folder-level column_types overrides --------------------------------
|
||
column_types = _parse_column_types(
|
||
raw.get("column_types"), f"Config {path}"
|
||
)
|
||
|
||
# -- folder-level all_nullable -----------------------------------------
|
||
# Sets the default for every cluster. Per-cluster ``all_nullable`` wins
|
||
# when present; the CLI ``--all-nullable`` flag trumps both.
|
||
raw_an_folder = raw.get("all_nullable", False)
|
||
if not isinstance(raw_an_folder, bool):
|
||
raise ValueError(
|
||
f"Config {path}: 'all_nullable' must be a boolean "
|
||
f"(got {raw_an_folder!r})."
|
||
)
|
||
all_nullable_default = bool(raw_an_folder)
|
||
|
||
# -- file_type ----------------------------------------------------------
|
||
file_type = str(raw.get("file_type", "sas")).lower()
|
||
if file_type not in VALID_FILE_TYPES:
|
||
raise ValueError(
|
||
f"Config {path}: file_type={file_type!r} is not one of "
|
||
f"{VALID_FILE_TYPES}"
|
||
)
|
||
|
||
# -- text-file-specific fields ------------------------------------------
|
||
raw_delim = raw.get("delimiter", ",")
|
||
if isinstance(raw_delim, str):
|
||
delim_lower = raw_delim.lower().strip()
|
||
if delim_lower in ("tab", "\\t"):
|
||
delimiter = "\t"
|
||
elif delim_lower in ("pipe", "|"):
|
||
delimiter = "|"
|
||
else:
|
||
delimiter = raw_delim
|
||
else:
|
||
delimiter = str(raw_delim)
|
||
|
||
text_encoding = str(raw.get("text_encoding", "utf-8"))
|
||
quotechar = str(raw.get("quotechar", '"'))
|
||
|
||
explicit: List[_ExplicitPattern] = []
|
||
clusters_raw = raw.get("clusters") or []
|
||
if not isinstance(clusters_raw, list):
|
||
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
|
||
for i, entry in enumerate(clusters_raw):
|
||
where = f"Config {path} clusters[{i}]"
|
||
if not isinstance(entry, dict):
|
||
raise ValueError(f"{where} must be a mapping.")
|
||
if "pattern" not in entry or "tablename" not in entry:
|
||
raise ValueError(f"{where} must include 'pattern' and 'tablename'.")
|
||
raw_pat = str(entry["pattern"])
|
||
try:
|
||
compiled = re.compile(raw_pat)
|
||
except re.error as e:
|
||
raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e
|
||
c_if_exists = (
|
||
_validate_if_exists(entry["if_exists"], where)
|
||
if "if_exists" in entry
|
||
else None
|
||
)
|
||
c_include, c_exclude = _parse_columns_filter(entry, where)
|
||
|
||
# -- per-cluster partition settings ---------------------------------
|
||
c_partition_by = _parse_partition_by(
|
||
entry.get("partition_by"), where, allow_none=True
|
||
)
|
||
c_max_partitions = _parse_max_partitions(
|
||
entry.get("max_partitions"), where, allow_none=True
|
||
)
|
||
# Validate partition_by vs the effective exclude for this cluster.
|
||
effective_exclude = c_exclude if c_exclude is not None else exclude
|
||
effective_pb = c_partition_by if c_partition_by is not None else partition_by
|
||
_validate_partition_vs_columns(effective_pb, effective_exclude, where)
|
||
|
||
# -- per-cluster index settings -------------------------------------
|
||
c_indexes = _parse_indexes(
|
||
entry.get("indexes"), where, allow_none=True
|
||
)
|
||
effective_idx = c_indexes if c_indexes is not None else indexes
|
||
_validate_indexes_vs_columns(effective_idx, effective_exclude, where)
|
||
|
||
# -- per-cluster column_types overrides -----------------------------
|
||
c_column_types = _parse_column_types(
|
||
entry.get("column_types"), where, allow_none=True
|
||
)
|
||
|
||
# -- per-cluster all_nullable ---------------------------------------
|
||
c_all_nullable: Optional[bool]
|
||
if "all_nullable" in entry:
|
||
raw_c_an = entry["all_nullable"]
|
||
if not isinstance(raw_c_an, bool):
|
||
raise ValueError(
|
||
f"{where}: 'all_nullable' must be a boolean "
|
||
f"(got {raw_c_an!r})."
|
||
)
|
||
c_all_nullable = bool(raw_c_an)
|
||
else:
|
||
c_all_nullable = None
|
||
|
||
explicit.append(
|
||
_ExplicitPattern(
|
||
pattern=compiled,
|
||
raw_pattern=raw_pat,
|
||
tablename=str(entry["tablename"]),
|
||
if_exists=c_if_exists,
|
||
include=c_include,
|
||
exclude=c_exclude,
|
||
partition_by=c_partition_by,
|
||
max_partitions=c_max_partitions,
|
||
indexes=c_indexes,
|
||
column_types=c_column_types,
|
||
all_nullable=c_all_nullable,
|
||
)
|
||
)
|
||
|
||
return FolderConfig(
|
||
folder=folder,
|
||
schemaname=schemaname,
|
||
if_exists=if_exists,
|
||
auto_detect=auto_detect,
|
||
include=include,
|
||
exclude=exclude,
|
||
explicit=explicit,
|
||
partition_by=partition_by,
|
||
max_partitions=max_partitions,
|
||
indexes=indexes,
|
||
column_types=column_types or {},
|
||
all_nullable=all_nullable_default,
|
||
file_type=file_type,
|
||
delimiter=delimiter,
|
||
text_encoding=text_encoding,
|
||
quotechar=quotechar,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cluster discovery
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
|
||
_DIGIT_GROUP_RE = re.compile(r"\d+")
|
||
|
||
|
||
def _auto_prefix(stem: str) -> str:
|
||
"""Derive the cluster key for a file stem.
|
||
|
||
Strip trailing digits and any trailing separators so
|
||
``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same
|
||
``group_a`` bucket. If nothing is stripped, the stem is its own key.
|
||
"""
|
||
stripped = _TRAILING_DIGIT_RE.sub("", stem)
|
||
stripped = stripped.rstrip("_-")
|
||
return stripped or stem
|
||
|
||
|
||
def _cluster_sort_key(path: Path) -> Tuple[int, str]:
|
||
"""Sort key for ordering files within a cluster.
|
||
|
||
Sorts numerically by the LAST digit group in the stem so ``_9`` comes
|
||
before ``_10`` / ``_40`` regardless of width, and so a file named
|
||
``foo_9_detail`` lands before ``foo_40_detail``. The first file under
|
||
this order is the one whose schema is inferred and used to create the
|
||
target table; sorting numerically keeps that choice stable as the file
|
||
set grows. Files with no digits fall to ``-1`` so they sort before
|
||
numbered files; the stem is a tiebreaker for reproducibility.
|
||
"""
|
||
digits = _DIGIT_GROUP_RE.findall(path.stem)
|
||
n = int(digits[-1]) if digits else -1
|
||
return (n, path.stem)
|
||
|
||
|
||
def _list_data_files(folder: Path, file_type: str = "sas") -> List[Path]:
|
||
"""List data files in ``folder`` filtered by ``file_type``.
|
||
|
||
When ``file_type`` is ``"text"``, only text extensions are matched.
|
||
When ``file_type`` is ``"sas"`` (the default), only SAS extensions are
|
||
matched. This keeps SAS and text file pools separate so a folder
|
||
containing both types doesn't accidentally mix them.
|
||
"""
|
||
if file_type == "text":
|
||
extensions = TEXT_EXTENSIONS
|
||
else:
|
||
extensions = SAS_EXTENSIONS
|
||
files: List[Path] = []
|
||
for p in sorted(folder.iterdir()):
|
||
if p.is_file() and p.suffix.lower() in extensions:
|
||
files.append(p)
|
||
return files
|
||
|
||
|
||
def _list_sas_files(folder: Path) -> List[Path]:
|
||
"""Backward-compatible wrapper around :func:`_list_data_files`."""
|
||
return _list_data_files(folder, file_type="sas")
|
||
|
||
|
||
def _build_text_kw(cluster: ClusterSpec) -> Dict[str, Any]:
|
||
"""Build the text-file keyword arguments dict from a cluster's config.
|
||
|
||
Returns a dict suitable for spreading into :func:`read_sas_preview`,
|
||
:func:`read_sas_metadata`, :func:`iter_sas_chunks`, etc. For SAS
|
||
file_type clusters the dict still carries the defaults, which the
|
||
reader functions ignore for SAS extensions.
|
||
"""
|
||
return dict(
|
||
delimiter=cluster.delimiter,
|
||
text_encoding=cluster.text_encoding,
|
||
quotechar=cluster.quotechar,
|
||
)
|
||
|
||
|
||
def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
|
||
"""Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects.
|
||
|
||
Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No
|
||
data file is opened here. Explicit patterns are applied first, in config
|
||
order; files matched by an earlier pattern are removed from the pool
|
||
before the next pattern runs. A file matching two patterns triggers a
|
||
hard error (that's almost always a config bug).
|
||
|
||
Partition settings are resolved per cluster:
|
||
|
||
* For explicit clusters, ``partition_by`` / ``max_partitions`` from the
|
||
cluster entry override the folder defaults when present. ``None``
|
||
means "inherit"; an explicit ``[]`` disables partitioning.
|
||
* For auto-detected clusters, folder defaults are inherited directly.
|
||
|
||
Text-file config (``file_type``, ``delimiter``, ``text_encoding``,
|
||
``quotechar``) is propagated from the folder config to every cluster.
|
||
"""
|
||
if not cfg.folder.exists() or not cfg.folder.is_dir():
|
||
raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}")
|
||
|
||
pool = _list_data_files(cfg.folder, file_type=cfg.file_type)
|
||
clusters: List[ClusterSpec] = []
|
||
|
||
# Text-file kwargs to propagate to every cluster.
|
||
_text_fields = dict(
|
||
file_type=cfg.file_type,
|
||
delimiter=cfg.delimiter,
|
||
text_encoding=cfg.text_encoding,
|
||
quotechar=cfg.quotechar,
|
||
)
|
||
|
||
# Detect cross-pattern overlap up front for a clearer error message.
|
||
for i, p_i in enumerate(cfg.explicit):
|
||
for j in range(i + 1, len(cfg.explicit)):
|
||
p_j = cfg.explicit[j]
|
||
for f in pool:
|
||
if p_i.pattern.search(f.name) and p_j.pattern.search(f.name):
|
||
raise ValueError(
|
||
f"File {f.name!r} matches multiple explicit patterns: "
|
||
f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}"
|
||
)
|
||
|
||
remaining = list(pool)
|
||
for patt in cfg.explicit:
|
||
# Resolve partition_by: None = inherit folder, [] = disable, list = override
|
||
resolved_pb = (
|
||
patt.partition_by if patt.partition_by is not None
|
||
else cfg.partition_by
|
||
)
|
||
resolved_mp = (
|
||
patt.max_partitions if patt.max_partitions is not None
|
||
else cfg.max_partitions
|
||
)
|
||
# Resolve indexes: None = inherit folder, [] = disable, list = override
|
||
resolved_idx = (
|
||
patt.indexes if patt.indexes is not None
|
||
else cfg.indexes
|
||
)
|
||
# Resolve column_types: user overrides only. The auto-union adds
|
||
# more entries later (in :func:`main`) after the metadata pre-scan.
|
||
# None = inherit folder, {} = no cluster-level overrides, dict =
|
||
# cluster-level overrides that win over folder-level entries.
|
||
if patt.column_types is None:
|
||
resolved_ct: Dict[str, str] = dict(cfg.column_types)
|
||
else:
|
||
resolved_ct = {**cfg.column_types, **patt.column_types}
|
||
|
||
# Resolve all_nullable: None = inherit folder, bool = override.
|
||
resolved_an = (
|
||
patt.all_nullable if patt.all_nullable is not None
|
||
else cfg.all_nullable
|
||
)
|
||
|
||
matched = [f for f in remaining if patt.pattern.search(f.name)]
|
||
if not matched:
|
||
# Not an error - the folder might legitimately not contain files
|
||
# for this pattern on a given run. Emit a note for the CLI.
|
||
clusters.append(
|
||
ClusterSpec(
|
||
tablename=patt.tablename,
|
||
files=[],
|
||
if_exists=patt.if_exists or cfg.if_exists,
|
||
include=patt.include if patt.include is not None else cfg.include,
|
||
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
|
||
source="explicit",
|
||
pattern=patt.raw_pattern,
|
||
partition_by=resolved_pb,
|
||
max_partitions=resolved_mp,
|
||
indexes=resolved_idx,
|
||
column_types=dict(resolved_ct),
|
||
all_nullable=resolved_an,
|
||
**_text_fields,
|
||
)
|
||
)
|
||
continue
|
||
remaining = [f for f in remaining if f not in matched]
|
||
clusters.append(
|
||
ClusterSpec(
|
||
tablename=patt.tablename,
|
||
files=sorted(matched, key=_cluster_sort_key),
|
||
if_exists=patt.if_exists or cfg.if_exists,
|
||
include=patt.include if patt.include is not None else cfg.include,
|
||
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
|
||
source="explicit",
|
||
pattern=patt.raw_pattern,
|
||
partition_by=resolved_pb,
|
||
max_partitions=resolved_mp,
|
||
indexes=resolved_idx,
|
||
column_types=dict(resolved_ct),
|
||
all_nullable=resolved_an,
|
||
**_text_fields,
|
||
)
|
||
)
|
||
|
||
if cfg.auto_detect and remaining:
|
||
buckets: Dict[str, List[Path]] = {}
|
||
for f in remaining:
|
||
key = _auto_prefix(f.stem)
|
||
buckets.setdefault(key, []).append(f)
|
||
for key in sorted(buckets):
|
||
clusters.append(
|
||
ClusterSpec(
|
||
tablename=key,
|
||
files=sorted(buckets[key], key=_cluster_sort_key),
|
||
if_exists=cfg.if_exists,
|
||
include=cfg.include,
|
||
exclude=cfg.exclude,
|
||
source="auto",
|
||
partition_by=cfg.partition_by,
|
||
max_partitions=cfg.max_partitions,
|
||
indexes=cfg.indexes,
|
||
column_types=dict(cfg.column_types),
|
||
all_nullable=cfg.all_nullable,
|
||
**_text_fields,
|
||
)
|
||
)
|
||
|
||
return clusters
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-cluster load
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _infer_cluster_schema(
|
||
path: Path,
|
||
include,
|
||
exclude,
|
||
*,
|
||
column_types: Optional[Dict[str, str]] = None,
|
||
force_nullable: bool = False,
|
||
text_kw: Optional[Dict[str, Any]] = None,
|
||
) -> Tuple[Dict, Optional[int]]:
|
||
"""Infer the Postgres column schema from a data file preview.
|
||
|
||
Returns ``(columns, total_rows)``. ``total_rows`` comes from the
|
||
file metadata (the file's declared row count) and is threaded
|
||
through to :func:`_stream_file` so the tqdm progress bar has a real
|
||
denominator instead of an indeterminate spinner. ``column_types``
|
||
lets the caller pin specific columns to a chosen Postgres type
|
||
(typically the merged auto-union + YAML overrides for the cluster).
|
||
``force_nullable`` stamps every column nullable regardless of what
|
||
the preview shows - see :func:`load_sas.infer_schema`.
|
||
|
||
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
|
||
through to :func:`read_sas_preview` for text file dispatch.
|
||
"""
|
||
_tkw = text_kw or {}
|
||
preview_df, meta = read_sas_preview(path, **_tkw)
|
||
preview_df = apply_column_filter(preview_df, include, exclude)
|
||
total_rows = getattr(meta, "number_rows", None)
|
||
columns = infer_schema(
|
||
preview_df, meta,
|
||
total_rows=total_rows,
|
||
column_types=column_types,
|
||
force_nullable=force_nullable,
|
||
)
|
||
return columns, total_rows
|
||
|
||
|
||
def _discover_cluster_partitions(
|
||
cluster: ClusterSpec,
|
||
columns: Dict,
|
||
) -> dict:
|
||
"""Scan ALL files in ``cluster`` to discover partition values.
|
||
|
||
Returns a nested partition-value tree suitable for passing to
|
||
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
|
||
Each file is scanned chunk-by-chunk so the full dataset is never
|
||
materialized in memory.
|
||
"""
|
||
tkw = _build_text_kw(cluster)
|
||
merged: dict = {}
|
||
for path in cluster.files:
|
||
def _filtered_chunks(p=path):
|
||
for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw):
|
||
yield apply_column_filter(
|
||
chunk_df, cluster.include, cluster.exclude
|
||
)
|
||
|
||
file_tree = discover_partition_values_chunked(
|
||
_filtered_chunks(), cluster.partition_by, columns,
|
||
)
|
||
_merge_partition_trees(merged, file_tree)
|
||
return merged
|
||
|
||
|
||
def load_cluster(
|
||
conn,
|
||
cluster: ClusterSpec,
|
||
schemaname: str,
|
||
*,
|
||
workers: int = 1,
|
||
progress_queue: Any = None,
|
||
db_overrides: Optional[Dict[str, Optional[str]]] = None,
|
||
abort_on_first_failure: bool = False,
|
||
) -> int:
|
||
"""Load every file in ``cluster`` into one table. Returns total rows loaded.
|
||
|
||
When ``cluster.partition_by`` is non-empty, partition values are
|
||
discovered across ALL files before table creation so the full partition
|
||
tree exists before any data is copied.
|
||
|
||
Commits happen per chunk inside :func:`load_sas.copy_dataframes`. If a
|
||
file mid-cluster fails, earlier chunks - including chunks from earlier
|
||
files in the cluster - stay committed; only the in-flight chunk is
|
||
rolled back by :func:`main`.
|
||
|
||
``workers`` controls parallelism for streaming. With ``workers == 1``
|
||
every file streams on ``conn`` in sequence. With ``workers > 1`` the
|
||
main connection only does ``CREATE TABLE`` (and, for partitioned
|
||
clusters, partition discovery + pre-creation), commits, then dispatches
|
||
*every* file - including the first - to a ``ProcessPoolExecutor``. Each
|
||
worker opens its own psycopg2 connection, re-infers the per-file schema,
|
||
runs the same :func:`load_sas.assert_schema_compatible` check the serial
|
||
path uses, and streams chunks via COPY. Workers report per-chunk row
|
||
counts to ``progress_queue`` so the caller can drive a single aggregated
|
||
tqdm bar regardless of how many workers are in flight.
|
||
|
||
``db_overrides`` carries ``{"user", "password"}`` into workers when the
|
||
caller prompted for credentials interactively; leave ``None`` to let
|
||
workers read the standard libpq environment variables on their own.
|
||
"""
|
||
if not cluster.files:
|
||
return 0
|
||
|
||
tkw = _build_text_kw(cluster)
|
||
|
||
first, *rest = cluster.files
|
||
first_columns, first_total_rows = _infer_cluster_schema(
|
||
first, cluster.include, cluster.exclude,
|
||
column_types=cluster.column_types,
|
||
force_nullable=cluster.all_nullable,
|
||
text_kw=tkw,
|
||
)
|
||
|
||
# -- Validate index columns early ---------------------------------------
|
||
if cluster.indexes:
|
||
missing_icols = [
|
||
c for c in cluster.indexes if c not in first_columns
|
||
]
|
||
if missing_icols:
|
||
raise ValueError(
|
||
f"cluster {cluster.tablename!r}: indexes references "
|
||
f"columns not present in the inferred schema: {missing_icols}"
|
||
)
|
||
|
||
# -- Partition support --------------------------------------------------
|
||
partition_values: Optional[dict] = None
|
||
if cluster.partition_by:
|
||
# Validate that all partition_by columns exist in the inferred schema.
|
||
missing_pcols = [
|
||
c for c in cluster.partition_by if c not in first_columns
|
||
]
|
||
if missing_pcols:
|
||
raise ValueError(
|
||
f"cluster {cluster.tablename!r}: partition_by references "
|
||
f"columns not present in the inferred schema: {missing_pcols}"
|
||
)
|
||
|
||
# Discover partition values across ALL files in the cluster.
|
||
# In append mode the partitions already exist, so skip the scan.
|
||
if cluster.if_exists == "append":
|
||
print(
|
||
" [info] append mode: skipping partition discovery "
|
||
"(partitions assumed to exist)",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
print(
|
||
f" discovering partition values across "
|
||
f"{len(cluster.files)} file(s)...",
|
||
file=sys.stderr,
|
||
)
|
||
partition_values = _discover_cluster_partitions(
|
||
cluster, first_columns,
|
||
)
|
||
total_parts = _count_partitions(partition_values)
|
||
print(
|
||
f" discovered {total_parts:,} partition table(s) "
|
||
f"across {len(cluster.partition_by)} level(s)",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
create_table(
|
||
conn, schemaname, cluster.tablename, first_columns, cluster.if_exists,
|
||
partition_by=cluster.partition_by or None,
|
||
partition_values=partition_values,
|
||
max_partitions=cluster.max_partitions,
|
||
)
|
||
|
||
total = 0
|
||
|
||
if workers > 1:
|
||
# Parallel path: commit the (empty) table now so worker subprocesses'
|
||
# ``assert_schema_compatible`` probes can actually see it via
|
||
# ``information_schema``, then dispatch *every* file (first +
|
||
# rest) to the pool. The previous design streamed the first file
|
||
# on the main connection before spawning workers, which made the
|
||
# serial first-file phase the long pole on big-file clusters
|
||
# (e.g. 52 × 5-50 GB). Now ``CREATE TABLE`` is the only serial
|
||
# work and it takes milliseconds.
|
||
conn.commit()
|
||
total += _load_remaining_files_parallel(
|
||
cluster.files,
|
||
schemaname,
|
||
cluster.tablename,
|
||
cluster.include,
|
||
cluster.exclude,
|
||
workers=workers,
|
||
progress_queue=progress_queue,
|
||
db_overrides=db_overrides,
|
||
column_types=cluster.column_types,
|
||
force_nullable=cluster.all_nullable,
|
||
abort_on_first_failure=abort_on_first_failure,
|
||
text_kw=tkw,
|
||
)
|
||
else:
|
||
# Serial path: stream the first file on the main connection, then
|
||
# iterate the rest. Worth keeping separate from the parallel path
|
||
# because spawning a single-worker pool just to load files in
|
||
# series would be pure overhead.
|
||
total += _stream_file(
|
||
conn, schemaname, cluster.tablename, first, first_columns,
|
||
cluster.include, cluster.exclude,
|
||
total_rows=first_total_rows,
|
||
progress_queue=progress_queue,
|
||
text_kw=tkw,
|
||
)
|
||
conn.commit()
|
||
for path in rest:
|
||
columns, path_total_rows = _infer_cluster_schema(
|
||
path, cluster.include, cluster.exclude,
|
||
column_types=cluster.column_types,
|
||
force_nullable=cluster.all_nullable,
|
||
text_kw=tkw,
|
||
)
|
||
# Uses the same check that if_exists=append runs. A type
|
||
# mismatch or missing column aborts the cluster; because
|
||
# chunks commit as they load, earlier chunks in the
|
||
# cluster remain in the table.
|
||
assert_schema_compatible(
|
||
conn, schemaname, cluster.tablename, columns
|
||
)
|
||
total += _stream_file(
|
||
conn, schemaname, cluster.tablename, path, columns,
|
||
cluster.include, cluster.exclude,
|
||
total_rows=path_total_rows,
|
||
progress_queue=progress_queue,
|
||
text_kw=tkw,
|
||
)
|
||
|
||
# -- Index support ------------------------------------------------------
|
||
if cluster.indexes:
|
||
create_indexes(conn, schemaname, cluster.tablename, cluster.indexes)
|
||
|
||
return total
|
||
|
||
|
||
def _stream_file(
|
||
conn,
|
||
schemaname: str,
|
||
tablename: str,
|
||
path: Path,
|
||
columns,
|
||
include,
|
||
exclude,
|
||
*,
|
||
total_rows: Optional[int] = None,
|
||
progress_queue: Any = None,
|
||
text_kw: Optional[Dict[str, Any]] = None,
|
||
) -> int:
|
||
"""Stream ``path`` into an existing table chunk by chunk.
|
||
|
||
When ``progress_queue`` is provided, each chunk's row count is published
|
||
to the queue as ``("rows", n)`` tuples instead of being rendered to a
|
||
per-file tqdm bar. That lets :func:`main` drive a single folder-wide
|
||
progress bar from a background drainer thread, which is the only way
|
||
to keep a coherent progress view when the folder loader is running
|
||
files in parallel workers.
|
||
|
||
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
|
||
through to :func:`iter_sas_chunks` for text file dispatch.
|
||
"""
|
||
_tkw = text_kw or {}
|
||
|
||
def _chunks():
|
||
if progress_queue is not None:
|
||
for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw):
|
||
chunk_df = apply_column_filter(chunk_df, include, exclude)
|
||
progress_queue.put(("rows", len(chunk_df)))
|
||
yield chunk_df
|
||
return
|
||
|
||
pbar = tqdm(
|
||
total=total_rows,
|
||
unit="row",
|
||
unit_scale=True,
|
||
desc=f" {path.name}",
|
||
file=sys.stderr,
|
||
dynamic_ncols=True,
|
||
)
|
||
try:
|
||
for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw):
|
||
chunk_df = apply_column_filter(chunk_df, include, exclude)
|
||
pbar.update(len(chunk_df))
|
||
yield chunk_df
|
||
finally:
|
||
pbar.close()
|
||
|
||
return copy_dataframes(conn, schemaname, tablename, _chunks(), columns)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Parallel append workers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _worker_load_append_file(
|
||
path_str: str,
|
||
schemaname: str,
|
||
tablename: str,
|
||
include: Optional[List[str]],
|
||
exclude: Optional[List[str]],
|
||
progress_queue: Any,
|
||
db_overrides: Optional[Dict[str, Optional[str]]],
|
||
column_types: Optional[Dict[str, str]] = None,
|
||
force_nullable: bool = False,
|
||
text_kw: Optional[Dict[str, Any]] = None,
|
||
) -> Tuple[str, int, Optional[str]]:
|
||
"""Worker process: load one data file in append mode.
|
||
|
||
Runs in a subprocess spawned by :func:`_load_remaining_files_parallel`.
|
||
Opens its own psycopg2 connection, re-infers the per-file schema (so
|
||
per-file ``INTEGER`` vs ``BIGINT`` drift is caught by the existing
|
||
schema-compat check just like in the serial path), and streams chunks
|
||
via ``COPY``. Row counts are published to the shared queue for the
|
||
main process's global tqdm bar.
|
||
|
||
``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar``
|
||
through to the reader functions for text file dispatch.
|
||
|
||
Returns ``(path_str, rows_loaded, error_or_None)`` - failures are
|
||
returned rather than raised so the parent can aggregate results
|
||
across workers without losing partial progress.
|
||
"""
|
||
from pathlib import Path as _Path
|
||
|
||
from dotenv import load_dotenv as _load_dotenv
|
||
|
||
import ctypes
|
||
import ctypes.util
|
||
import gc
|
||
|
||
from load_sas import (
|
||
apply_column_filter as _apply_column_filter,
|
||
assert_schema_compatible as _assert_schema_compatible,
|
||
connect as _connect,
|
||
copy_dataframes as _copy_dataframes,
|
||
infer_schema as _infer_schema,
|
||
iter_sas_chunks as _iter_sas_chunks,
|
||
read_sas_preview as _read_sas_preview,
|
||
)
|
||
|
||
_load_dotenv()
|
||
|
||
_tkw = text_kw or {}
|
||
path = _Path(path_str)
|
||
try:
|
||
preview_df, meta = _read_sas_preview(path, **_tkw)
|
||
preview_df = _apply_column_filter(preview_df, include, exclude)
|
||
total_rows = getattr(meta, "number_rows", None)
|
||
columns = _infer_schema(
|
||
preview_df, meta,
|
||
total_rows=total_rows,
|
||
column_types=column_types,
|
||
force_nullable=force_nullable,
|
||
)
|
||
# Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB
|
||
# and we never need it again after schema inference.
|
||
del preview_df, meta
|
||
|
||
user = db_overrides.get("user") if db_overrides else None
|
||
password = db_overrides.get("password") if db_overrides else None
|
||
conn = _connect(user=user, password=password)
|
||
conn.autocommit = False
|
||
try:
|
||
_assert_schema_compatible(conn, schemaname, tablename, columns)
|
||
|
||
def _chunks():
|
||
for chunk_df, _chunk_meta in _iter_sas_chunks(path, **_tkw):
|
||
chunk_df = _apply_column_filter(chunk_df, include, exclude)
|
||
if progress_queue is not None:
|
||
progress_queue.put(("rows", len(chunk_df)))
|
||
yield chunk_df
|
||
|
||
rows = _copy_dataframes(
|
||
conn, schemaname, tablename, _chunks(), columns
|
||
)
|
||
conn.commit()
|
||
return (path_str, rows, None)
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
import traceback as _traceback
|
||
tb = _traceback.format_exc()
|
||
# Keep the one-line summary (what the tqdm [FAIL] print uses) but
|
||
# tack on the full traceback so the final cluster-failure block
|
||
# shows the file/line that crashed. Without this, ``ProcessPool``
|
||
# workers lose every frame of context - you get "FloatingPointError:
|
||
# overflow encountered in multiply" with no hint of where inside
|
||
# the pandas/numpy/pyarrow stack it happened.
|
||
return (path_str, 0, f"{type(e).__name__}: {e}\n{tb}")
|
||
finally:
|
||
# Hand memory back to the OS before the worker is recycled (or before
|
||
# ``max_tasks_per_child`` rotates this process). Three layers, each
|
||
# of which independently retains memory across calls:
|
||
#
|
||
# 1. pyarrow's memory pool aggressively reuses buffers - explicitly
|
||
# release_unused() returns them to the allocator.
|
||
# 2. Python's GC: cyclic refs from pandas/pyarrow chains aren't
|
||
# collected until a generation tick; force one now.
|
||
# 3. glibc's ptmalloc keeps freed heap in per-thread arenas instead
|
||
# of munmap'ing it back. ``malloc_trim(0)`` is the explicit ask.
|
||
# No-op (silently) on platforms without the symbol (macOS, etc).
|
||
try:
|
||
import pyarrow as _pa
|
||
_pa.default_memory_pool().release_unused()
|
||
except Exception:
|
||
pass
|
||
gc.collect()
|
||
try:
|
||
_libc_name = ctypes.util.find_library("c")
|
||
if _libc_name:
|
||
_libc = ctypes.CDLL(_libc_name)
|
||
if hasattr(_libc, "malloc_trim"):
|
||
_libc.malloc_trim(0)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _load_remaining_files_parallel(
|
||
files: List[Path],
|
||
schemaname: str,
|
||
tablename: str,
|
||
include: Optional[List[str]],
|
||
exclude: Optional[List[str]],
|
||
*,
|
||
workers: int,
|
||
progress_queue: Any,
|
||
db_overrides: Optional[Dict[str, Optional[str]]],
|
||
column_types: Optional[Dict[str, str]] = None,
|
||
force_nullable: bool = False,
|
||
abort_on_first_failure: bool = False,
|
||
text_kw: Optional[Dict[str, Any]] = None,
|
||
) -> int:
|
||
"""Run append-mode loads for ``files`` across a process pool.
|
||
|
||
Each file is an independent unit of work submitted to
|
||
``ProcessPoolExecutor``. Workers infer schema, validate compatibility,
|
||
and stream via COPY just like the serial path. The table itself must
|
||
already exist (and be committed) before this is called - the worker
|
||
schema-compat probes read ``information_schema``, which won't see an
|
||
uncommitted ``CREATE TABLE``. Failures are surfaced two ways:
|
||
|
||
1. **Live stderr feed.** Every worker's outcome - success or failure
|
||
- is written to stderr the moment ``as_completed`` hands it back,
|
||
via ``tqdm.write`` so the active progress bar stays intact. This
|
||
turns the previous "wait 30 minutes for the last worker to drain
|
||
before you find out the first 31 failed at minute 2" footgun into
|
||
an immediate notification, which matters most when one bad file
|
||
(e.g. schema drift, OOM, bad data) torpedoes most of the pool.
|
||
2. **Final aggregate raise.** Errors are still collected and raised as
|
||
one ``RuntimeError`` after the pool drains so the per-cluster
|
||
success/fail summary in :func:`main` stays accurate. On
|
||
``KeyboardInterrupt`` we re-raise after wrapping the partial
|
||
error list into the same ``RuntimeError`` shape, so Ctrl-C still
|
||
prints what failed up to that point instead of silently
|
||
discarding it.
|
||
"""
|
||
total = 0
|
||
errors: List[Tuple[str, str]] = []
|
||
completed = 0
|
||
n_files = len(files)
|
||
|
||
# ``max_tasks_per_child=1`` recycles each worker process after every
|
||
# file. Without this, glibc/pyarrow/pyreadstat all retain peak-water
|
||
# memory inside long-lived workers; over a multi-hour run the sum
|
||
# across workers monotonically grows even though individual chunks
|
||
# have been freed at the Python level. Recycling per file gives the
|
||
# OS the memory back unconditionally - the only cost is one fork +
|
||
# python interpreter startup per file (~1-2 s), which is noise next
|
||
# to multi-GB sas7bdat reads.
|
||
pool_kwargs: Dict[str, Any] = {"max_workers": workers}
|
||
if sys.version_info >= (3, 11):
|
||
pool_kwargs["max_tasks_per_child"] = 1
|
||
|
||
with ProcessPoolExecutor(**pool_kwargs) as pool:
|
||
futures = [
|
||
pool.submit(
|
||
_worker_load_append_file,
|
||
str(p),
|
||
schemaname,
|
||
tablename,
|
||
include,
|
||
exclude,
|
||
progress_queue,
|
||
db_overrides,
|
||
column_types,
|
||
force_nullable,
|
||
text_kw,
|
||
)
|
||
for p in files
|
||
]
|
||
aborted = False
|
||
try:
|
||
for fut in as_completed(futures):
|
||
# ``--abort-on-first-failure`` cancels still-pending
|
||
# futures; ``as_completed`` yields them anyway and
|
||
# ``.result()`` raises ``CancelledError``. Skip those
|
||
# quietly - the abort log already accounted for the
|
||
# cancellation count.
|
||
try:
|
||
path_str, rows, err = fut.result()
|
||
except CancelledError:
|
||
continue
|
||
completed += 1
|
||
name = Path(path_str).name
|
||
if err is not None:
|
||
errors.append((path_str, err))
|
||
# ``tqdm.write`` writes through the bar without
|
||
# garbling it; bare ``print`` would interleave with
|
||
# the rendered progress line.
|
||
tqdm.write(
|
||
f"[FAIL {completed}/{n_files}] {name}: {err}",
|
||
file=sys.stderr,
|
||
)
|
||
if abort_on_first_failure and not aborted:
|
||
# Cancel anything still queued. Already-running
|
||
# workers can't be interrupted portably, so they
|
||
# keep going - we just stop dispatching new files
|
||
# and stop counting their results toward total.
|
||
# Set the flag once so we don't spam a cancel
|
||
# storm if multiple workers fail at the same time.
|
||
aborted = True
|
||
cancelled = 0
|
||
for f in futures:
|
||
if not f.done() and f.cancel():
|
||
cancelled += 1
|
||
tqdm.write(
|
||
f"[abort] --abort-on-first-failure: cancelled "
|
||
f"{cancelled} pending file(s); waiting for "
|
||
f"{n_files - completed - cancelled} in-flight "
|
||
f"worker(s) to finish.",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
tqdm.write(
|
||
f"[done {completed}/{n_files}] {name}: "
|
||
f"{rows:,} row(s)",
|
||
file=sys.stderr,
|
||
)
|
||
total += rows
|
||
except KeyboardInterrupt:
|
||
# Cancel anything still queued so we exit promptly. Already-
|
||
# running workers will run to completion (Python can't kill
|
||
# a child mid-syscall portably) but at least pending tasks
|
||
# don't keep firing. We re-raise as the same RuntimeError
|
||
# shape used below so the caller's per-cluster summary path
|
||
# still sees the partial failure list instead of an opaque
|
||
# KeyboardInterrupt with no context about which workers
|
||
# had already failed.
|
||
for f in futures:
|
||
if not f.done():
|
||
f.cancel()
|
||
tqdm.write(
|
||
f"[interrupt] Ctrl-C after {completed}/{n_files} file(s); "
|
||
f"{len(errors)} failure(s) collected so far.",
|
||
file=sys.stderr,
|
||
)
|
||
if errors:
|
||
joined = "\n".join(f" {p}: {e}" for p, e in errors)
|
||
raise RuntimeError(
|
||
f"interrupted after {len(errors)} worker(s) failed "
|
||
f"while appending to {schemaname}.{tablename}:\n{joined}"
|
||
)
|
||
raise
|
||
|
||
if errors:
|
||
joined = "\n".join(f" {p}: {e}" for p, e in errors)
|
||
raise RuntimeError(
|
||
f"{len(errors)} worker(s) failed while appending to "
|
||
f"{schemaname}.{tablename}:\n{joined}"
|
||
)
|
||
|
||
return total
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _build_argparser() -> argparse.ArgumentParser:
|
||
p = argparse.ArgumentParser(
|
||
description=(
|
||
"Load every data file (SAS or delimited text) in a folder into "
|
||
"Postgres, grouping files into clusters that each become one table."
|
||
),
|
||
)
|
||
p.add_argument("--config", required=True, type=Path, help="Path to YAML config")
|
||
p.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help=(
|
||
"Print discovered clusters and the inferred DDL for each "
|
||
"(CREATE TABLE plus partition DDL when applicable). For "
|
||
"partitioned clusters all files are scanned to discover "
|
||
"partition values. The database is never touched."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--fail-fast",
|
||
action="store_true",
|
||
help=(
|
||
"Abort on the first cluster failure. Default is to roll that "
|
||
"cluster back and continue with the next one."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--abort-on-first-failure",
|
||
action="store_true",
|
||
help=(
|
||
"Within a single cluster's parallel append, cancel every "
|
||
"still-pending worker the instant one worker fails. Use when "
|
||
"you know the failure is systemic (schema drift, bad creds, "
|
||
"OOM) and don't want to wait for the slow files to drain "
|
||
"before getting your prompt back. Already-running workers "
|
||
"still finish their current file - Python can't kill children "
|
||
"mid-syscall - but new files won't be dispatched. Orthogonal "
|
||
"to --fail-fast, which controls what happens between clusters."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--dbcreds",
|
||
action="store_true",
|
||
help=(
|
||
"Prompt for database username and password instead of reading "
|
||
"PGUSER / PGPASSWORD from the environment or .env file."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--chunk-rows",
|
||
type=int,
|
||
default=None,
|
||
metavar="N",
|
||
help=(
|
||
"Per-chunk row target for streaming and COPY. "
|
||
"Overrides both the GENERIC_LOADER_CHUNK_ROWS env var and the "
|
||
"auto-scaling applied when --workers > 1. Peak memory per "
|
||
"worker is roughly 4 × N × avg_row_bytes; with wide data "
|
||
"files (~4 KB/row) and 32 workers, N=100000 is a safe starting "
|
||
"point on a 128 GB box."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--no-prescan",
|
||
action="store_true",
|
||
help=(
|
||
"Skip the per-file metadata scan that populates the folder-wide "
|
||
"tqdm ETA. Useful when the folder is large (half-hour+ pre-scan) "
|
||
"or when you're iterating quickly on a failure. Without the "
|
||
"pre-scan the progress bar still shows rows loaded, rate, and "
|
||
"elapsed time - it just can't estimate remaining time."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--all-nullable",
|
||
action="store_true",
|
||
help=(
|
||
"Stamp every column nullable in the generated schema, bypassing "
|
||
"NOT NULL inference for every cluster. Use when sampled rows "
|
||
"wrongly suggest a column has no nulls and COPY fails mid-load "
|
||
"on the first null it hits. Overrides the per-cluster and "
|
||
"folder-level ``all_nullable`` YAML settings when set."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--workers",
|
||
type=int,
|
||
default=1,
|
||
metavar="N",
|
||
help=(
|
||
"Number of worker processes for the append phase. With N=1 "
|
||
"(default) files load serially on the main connection. With "
|
||
"N>1 the first file of each cluster still runs serially (to "
|
||
"create the table), then the remaining files load in parallel "
|
||
"across N processes, each with its own psycopg2 connection. "
|
||
"On a big box try N close to your core count. When N>1 the "
|
||
"per-chunk row target drops to 500,000 unless you've pinned "
|
||
"GENERIC_LOADER_CHUNK_ROWS, so peak memory stays bounded."
|
||
),
|
||
)
|
||
return p
|
||
|
||
|
||
def _describe_cluster(cluster: ClusterSpec) -> str:
|
||
src = f"{cluster.source}"
|
||
if cluster.pattern:
|
||
src += f" pattern={cluster.pattern!r}"
|
||
files = ", ".join(f.name for f in cluster.files) or "(no matching files)"
|
||
parts = ""
|
||
if cluster.partition_by:
|
||
parts = f"\n partition_by: {cluster.partition_by}"
|
||
idx = ""
|
||
if cluster.indexes:
|
||
idx = f"\n indexes: {cluster.indexes}"
|
||
return (
|
||
f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n"
|
||
f" files: {files}{parts}{idx}"
|
||
)
|
||
|
||
|
||
def main(argv: Optional[List[str]] = None) -> int:
|
||
args = _build_argparser().parse_args(argv)
|
||
|
||
load_dotenv()
|
||
|
||
cfg = load_folder_config(args.config)
|
||
|
||
if not cfg.folder.exists() or not cfg.folder.is_dir():
|
||
print(f"error: folder not found: {cfg.folder}", file=sys.stderr)
|
||
return 2
|
||
|
||
clusters = discover_clusters(cfg)
|
||
|
||
# CLI override: --all-nullable trumps both folder-level and per-cluster
|
||
# YAML ``all_nullable`` settings. Applied here (before any schema work)
|
||
# so every downstream path - dry-run, pre-scan, worker dispatch - sees
|
||
# the same flag on the ClusterSpec.
|
||
if args.all_nullable:
|
||
for c in clusters:
|
||
c.all_nullable = True
|
||
print(
|
||
"[info] --all-nullable set: stamping every column nullable "
|
||
"across all clusters (NOT NULL inference disabled).",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
loadable = [c for c in clusters if c.files]
|
||
|
||
if not loadable:
|
||
if cfg.file_type == "text":
|
||
ext_label = ", ".join(TEXT_EXTENSIONS)
|
||
kind = "text"
|
||
else:
|
||
ext_label = ", ".join(SAS_EXTENSIONS)
|
||
kind = "SAS"
|
||
print(
|
||
f"error: no {kind} files found in {cfg.folder} "
|
||
f"(looked for {ext_label})",
|
||
file=sys.stderr,
|
||
)
|
||
return 2
|
||
|
||
print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:")
|
||
for c in clusters:
|
||
print(_describe_cluster(c))
|
||
|
||
if args.dry_run:
|
||
print()
|
||
for c in loadable:
|
||
print(f"--- DDL for cluster {c.tablename!r} ---")
|
||
# Dry-run skips the pre-scan (so no auto-union) but user-supplied
|
||
# ``column_types`` from YAML are already baked into ``c.column_types``
|
||
# by ``discover_clusters`` - honor them here so the previewed DDL
|
||
# matches what a real load would produce on a single-file cluster.
|
||
_dry_tkw = _build_text_kw(c)
|
||
columns, _ = _infer_cluster_schema(
|
||
c.files[0], c.include, c.exclude,
|
||
column_types=c.column_types,
|
||
force_nullable=c.all_nullable,
|
||
text_kw=_dry_tkw,
|
||
)
|
||
# Print parent CREATE TABLE (with PARTITION BY if applicable).
|
||
print(
|
||
render_create_table(
|
||
cfg.schemaname, c.tablename, columns,
|
||
partition_by=c.partition_by or None,
|
||
)
|
||
)
|
||
# Print child partition DDL when the cluster is partitioned.
|
||
if c.partition_by:
|
||
# Validate partition columns exist in the schema.
|
||
missing_pcols = [
|
||
col for col in c.partition_by if col not in columns
|
||
]
|
||
if missing_pcols:
|
||
print(
|
||
f" [error] partition_by references columns not in "
|
||
f"schema: {missing_pcols}",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
print(
|
||
f" discovering partition values across "
|
||
f"{len(c.files)} file(s)...",
|
||
file=sys.stderr,
|
||
)
|
||
partition_values = _discover_cluster_partitions(
|
||
c, columns,
|
||
)
|
||
total_parts = _count_partitions(partition_values)
|
||
print(
|
||
f" discovered {total_parts:,} partition table(s) "
|
||
f"across {len(c.partition_by)} level(s)",
|
||
file=sys.stderr,
|
||
)
|
||
child_stmts = render_partition_ddl(
|
||
cfg.schemaname, c.tablename, c.partition_by,
|
||
partition_values, columns,
|
||
max_partitions=c.max_partitions,
|
||
)
|
||
for stmt in child_stmts:
|
||
print()
|
||
print(stmt)
|
||
# Print CREATE INDEX DDL when the cluster has indexes.
|
||
if c.indexes:
|
||
missing_icols = [
|
||
col for col in c.indexes if col not in columns
|
||
]
|
||
if missing_icols:
|
||
print(
|
||
f" [error] indexes references columns not in "
|
||
f"schema: {missing_icols}",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
idx_stmts = render_create_indexes(
|
||
cfg.schemaname, c.tablename, c.indexes,
|
||
)
|
||
for stmt in idx_stmts:
|
||
print()
|
||
print(stmt)
|
||
print()
|
||
return 0
|
||
|
||
db_user = db_password = None
|
||
if args.dbcreds:
|
||
db_user = input("Database username: ")
|
||
db_password = getpass.getpass("Database password: ")
|
||
db_overrides: Optional[Dict[str, Optional[str]]] = (
|
||
{"user": db_user, "password": db_password} if args.dbcreds else None
|
||
)
|
||
|
||
workers = max(1, int(args.workers))
|
||
|
||
# Per-worker peak memory ~= chunk_rows × avg_row_bytes × ~4 (the original
|
||
# pyreadstat DataFrame, the type-coerced ``prepared`` copy, the pyarrow
|
||
# table, and the serialized CSV buffer can all be alive simultaneously).
|
||
# With 32 workers and 500k rows × wide sas7bdat that's easily >128 GB -
|
||
# the default the loader shipped with OOM'd on a c6i.32xlarge box. Scale
|
||
# the auto target inversely with worker count so total memory stays
|
||
# roughly flat regardless of how many workers you pick. Floor of 50k
|
||
# keeps per-chunk overhead amortized; ceiling of 500k is where pyarrow
|
||
# / pyreadstat buffer spikes start to dominate.
|
||
#
|
||
# Order of precedence (most wins):
|
||
# 1. ``--chunk-rows N`` CLI flag (if provided)
|
||
# 2. ``GENERIC_LOADER_CHUNK_ROWS`` env var (if already set)
|
||
# 3. Auto-pick based on ``workers``
|
||
if args.chunk_rows is not None:
|
||
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(int(args.chunk_rows))
|
||
print(
|
||
f"[info] --chunk-rows {args.chunk_rows:,}: pinning per-chunk "
|
||
f"row target (overrides auto-scaling).",
|
||
file=sys.stderr,
|
||
)
|
||
elif "GENERIC_LOADER_CHUNK_ROWS" in os.environ:
|
||
print(
|
||
f"[info] honoring GENERIC_LOADER_CHUNK_ROWS="
|
||
f"{os.environ['GENERIC_LOADER_CHUNK_ROWS']} from environment.",
|
||
file=sys.stderr,
|
||
)
|
||
elif workers > 1:
|
||
auto_rows = max(50_000, min(500_000, 3_200_000 // workers))
|
||
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(auto_rows)
|
||
print(
|
||
f"[info] parallel mode (workers={workers}): auto-scaled "
|
||
f"per-chunk rows to {auto_rows:,}. "
|
||
f"Use --chunk-rows N to override if you have RAM headroom.",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
# -- Metadata pre-scan -----------------------------------------------------
|
||
# Sum ``number_rows`` across every file so the tqdm bar has a real
|
||
# denominator, AND collect the per-column (readstat_type, sas_format)
|
||
# tuples so we can union schemas across files in a cluster before any
|
||
# CREATE TABLE runs. ``read_sas_metadata`` uses pyreadstat's
|
||
# ``metadataonly=True`` fast path, but on multi-GB sas7bdat files
|
||
# that still reads tens of MB of scattered subheader pages per file -
|
||
# sequentially that's minutes for a 52-file folder. pyreadstat
|
||
# releases the GIL during I/O and C decoding, so a ThreadPool gives
|
||
# near-linear scaling until the disk saturates. ``--no-prescan``
|
||
# bypasses the scan entirely; the progress bar then runs without an
|
||
# ETA *and* the auto-union is skipped (user overrides from YAML
|
||
# still apply).
|
||
all_files: List[Path] = [p for c in loadable for p in c.files]
|
||
grand_total: Optional[int] = 0
|
||
file_meta_by_path: Dict[str, Dict[str, Tuple[str, Optional[str]]]] = {}
|
||
|
||
if args.no_prescan:
|
||
grand_total = None
|
||
print(
|
||
f"[info] --no-prescan set: skipping row-count pre-scan for "
|
||
f"{len(all_files)} file(s); progress bar will show rate + "
|
||
f"elapsed but no ETA. Cluster-wide schema auto-union is also "
|
||
f"disabled; only user-specified column_types overrides apply.",
|
||
file=sys.stderr,
|
||
)
|
||
else:
|
||
prescan_workers = min(16, max(1, len(all_files)))
|
||
print(
|
||
f"pre-scanning row counts + per-column metadata for "
|
||
f"{len(all_files)} file(s) across {prescan_workers} thread(s)...",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
def _scan_one(
|
||
p: Path,
|
||
) -> Tuple[
|
||
Path,
|
||
Optional[int],
|
||
Optional[Dict[str, Tuple[str, Optional[str]]]],
|
||
Optional[str],
|
||
]:
|
||
try:
|
||
_prescan_tkw = dict(
|
||
delimiter=cfg.delimiter,
|
||
text_encoding=cfg.text_encoding,
|
||
quotechar=cfg.quotechar,
|
||
)
|
||
meta = read_sas_metadata(p, **_prescan_tkw)
|
||
n = getattr(meta, "number_rows", None)
|
||
col_meta = extract_union_metadata(meta)
|
||
return (
|
||
p,
|
||
int(n) if n is not None else None,
|
||
col_meta,
|
||
None,
|
||
)
|
||
except Exception as e:
|
||
return (p, None, None, str(e))
|
||
|
||
unknown_total_files: List[str] = []
|
||
running_total = 0
|
||
with ThreadPoolExecutor(max_workers=prescan_workers) as tpool:
|
||
prescan_bar = tqdm(
|
||
total=len(all_files),
|
||
unit="file",
|
||
desc=" prescanning",
|
||
file=sys.stderr,
|
||
dynamic_ncols=True,
|
||
)
|
||
try:
|
||
for p, n, col_meta, err in tpool.map(_scan_one, all_files):
|
||
prescan_bar.update(1)
|
||
if err is not None:
|
||
unknown_total_files.append(f"{p.name} ({err})")
|
||
elif n is None:
|
||
unknown_total_files.append(p.name)
|
||
else:
|
||
running_total += n
|
||
if col_meta is not None:
|
||
file_meta_by_path[str(p)] = col_meta
|
||
finally:
|
||
prescan_bar.close()
|
||
|
||
if unknown_total_files:
|
||
print(
|
||
f"[warn] could not read row count from "
|
||
f"{len(unknown_total_files)} file(s); progress bar ETA will "
|
||
f"be approximate.",
|
||
file=sys.stderr,
|
||
)
|
||
print(
|
||
f" total rows across folder: {running_total:,}",
|
||
file=sys.stderr,
|
||
)
|
||
grand_total = running_total
|
||
|
||
# -- Cluster-wide schema auto-union ---------------------------------------
|
||
# For each cluster, compute ``auto_types`` from the union of every
|
||
# file's metadata (see :func:`load_sas.union_column_types`). Merge with
|
||
# any user-supplied YAML overrides (user wins) and attach the result
|
||
# back onto the cluster so every later read - first-file inference,
|
||
# worker inference, schema-compat check - sees the same frozen schema.
|
||
# With ``--no-prescan`` the file_meta_by_path dict is empty and
|
||
# ``auto_types`` resolves to {}, so only the YAML overrides survive.
|
||
for c in loadable:
|
||
per_file = [
|
||
file_meta_by_path[str(p)]
|
||
for p in c.files
|
||
if str(p) in file_meta_by_path
|
||
]
|
||
auto_types = union_column_types(per_file) if per_file else {}
|
||
user_overrides = dict(c.column_types) # already merged folder+cluster
|
||
# User-supplied overrides win over the auto-union.
|
||
merged = {**auto_types, **user_overrides}
|
||
c.column_types = merged
|
||
|
||
if auto_types:
|
||
# Only call out columns where auto-union *changed* something
|
||
# relative to the default "first file wins" inference. We
|
||
# don't have the default inference in hand at this point, so
|
||
# log the full resolved map at a debug-friendly level - it's
|
||
# bounded by column count and the user asked for visibility
|
||
# into what got overridden.
|
||
shown = auto_types
|
||
if user_overrides:
|
||
# Distinguish the user-forced entries in the log so it's
|
||
# obvious which types came from YAML.
|
||
shown = {
|
||
col: (
|
||
f"{user_overrides[col]} (user override)"
|
||
if col in user_overrides
|
||
else pg
|
||
)
|
||
for col, pg in merged.items()
|
||
}
|
||
print(
|
||
f"[info] cluster {c.tablename!r}: auto-union derived "
|
||
f"{len(auto_types)} column type(s) across "
|
||
f"{len(per_file)} file(s): {shown}",
|
||
file=sys.stderr,
|
||
)
|
||
elif user_overrides and args.no_prescan:
|
||
print(
|
||
f"[info] cluster {c.tablename!r}: using {len(user_overrides)} "
|
||
f"user-supplied column_types override(s); auto-union "
|
||
f"disabled by --no-prescan.",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
# -- Shared progress plumbing ---------------------------------------------
|
||
# The queue crosses process boundaries when workers > 1 (managed proxy)
|
||
# and is a plain in-process queue otherwise; the put/get contract is
|
||
# identical either way. A daemon thread drains it and advances the one
|
||
# tqdm bar that spans the whole folder load.
|
||
manager: Optional[Any] = None
|
||
progress_queue: Any
|
||
if workers > 1:
|
||
manager = mp.Manager()
|
||
progress_queue = manager.Queue()
|
||
else:
|
||
progress_queue = _queue_mod.Queue()
|
||
|
||
pbar = tqdm(
|
||
total=grand_total or None,
|
||
unit="row",
|
||
unit_scale=True,
|
||
desc=f"{cfg.folder.name}",
|
||
file=sys.stderr,
|
||
dynamic_ncols=True,
|
||
)
|
||
stop_drainer = threading.Event()
|
||
|
||
def _drainer() -> None:
|
||
while not stop_drainer.is_set():
|
||
try:
|
||
event = progress_queue.get(timeout=0.1)
|
||
except _queue_mod.Empty:
|
||
continue
|
||
except (EOFError, OSError):
|
||
return
|
||
if not event:
|
||
continue
|
||
kind = event[0]
|
||
if kind == "rows":
|
||
pbar.update(event[1])
|
||
|
||
drainer_thread = threading.Thread(target=_drainer, daemon=True)
|
||
drainer_thread.start()
|
||
|
||
conn = connect(user=db_user, password=db_password)
|
||
conn.autocommit = False
|
||
failures: List[Tuple[str, Exception]] = []
|
||
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
|
||
try:
|
||
for cluster in loadable:
|
||
print(
|
||
f"\n>>> loading cluster {cluster.tablename!r} "
|
||
f"({len(cluster.files)} file(s)) "
|
||
f"[workers={workers}]"
|
||
)
|
||
try:
|
||
rows = load_cluster(
|
||
conn,
|
||
cluster,
|
||
cfg.schemaname,
|
||
workers=workers,
|
||
progress_queue=progress_queue,
|
||
db_overrides=db_overrides,
|
||
abort_on_first_failure=args.abort_on_first_failure,
|
||
)
|
||
conn.commit()
|
||
totals.append((cluster.tablename, len(cluster.files), rows))
|
||
print(
|
||
f" -> loaded {rows:,} row(s) into "
|
||
f"{cfg.schemaname}.{cluster.tablename}"
|
||
)
|
||
except Exception as e:
|
||
conn.rollback()
|
||
failures.append((cluster.tablename, e))
|
||
print(
|
||
f" !! cluster {cluster.tablename!r} failed: {e}",
|
||
file=sys.stderr,
|
||
)
|
||
if args.fail_fast:
|
||
break
|
||
finally:
|
||
# Drain any pending progress events before shutting the bar down so
|
||
# the final rendered total matches what actually landed.
|
||
stop_drainer.set()
|
||
drainer_thread.join(timeout=2.0)
|
||
pbar.close()
|
||
conn.close()
|
||
if manager is not None:
|
||
manager.shutdown()
|
||
|
||
print("\n=== summary ===")
|
||
for name, fcount, rows in totals:
|
||
print(f" ok {name}: {fcount} file(s), {rows:,} row(s)")
|
||
for name, err in failures:
|
||
print(f" FAIL {name}: {err}", file=sys.stderr)
|
||
|
||
return 1 if failures else 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|