Compare commits
8 Commits
edb9146682
...
f101eacffd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f101eacffd | ||
|
|
1bbe0d4cd6 | ||
|
|
c1e1fec10b | ||
|
|
3b913b2ca6 | ||
|
|
5b48872dd7 | ||
|
|
5645ff5597 | ||
|
|
3a0537270c | ||
|
|
4f7ded09c6 |
185
generic_loader/generate_sample_folder.py
Normal file
185
generic_loader/generate_sample_folder.py
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
"""Generate a folder of clustered SAS XPORT files for testing ``load_folder``.
|
||||||
|
|
||||||
|
Produces ``samples/folder_test/`` containing three clusters:
|
||||||
|
|
||||||
|
* ``group_a{1,2,3}.xpt`` - kitchen-sink schema (every column).
|
||||||
|
* ``group_b{1,2}.xpt`` - a *different* schema (drops ``BIGINT`` and
|
||||||
|
``TIMECOL``) so a schema-compat check would catch cross-cluster
|
||||||
|
contamination if the regex were wrong.
|
||||||
|
* ``standalone.xpt`` - singleton to exercise the no-cluster / singleton
|
||||||
|
auto-detect path.
|
||||||
|
|
||||||
|
Alongside the files, writes ``sample_folder_config.yaml`` that exercises
|
||||||
|
both code paths: ``group_a*`` via an explicit regex pattern, ``group_b*``
|
||||||
|
and ``standalone`` via auto-detect.
|
||||||
|
|
||||||
|
Finally, runs :func:`load_folder.discover_clusters` against the generated
|
||||||
|
folder and asserts the grouping is what we expect. This is a pure in-process
|
||||||
|
smoke test of the clustering logic; no Postgres connection is required.
|
||||||
|
|
||||||
|
Reuses ``generate_sample_sas.build_dataframe`` so data shape / dtypes match
|
||||||
|
the single-file loader tests. ``N_ROWS`` is temporarily shrunk on the
|
||||||
|
imported module for this run's duration so repeated invocations stay fast.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import pyreadstat
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
import generate_sample_sas as gss
|
||||||
|
from load_folder import discover_clusters, load_folder_config
|
||||||
|
|
||||||
|
|
||||||
|
FIXTURE_ROWS = 2_000
|
||||||
|
OUT_DIR = Path("samples/folder_test")
|
||||||
|
CONFIG_PATH = OUT_DIR / "folder_config.yaml"
|
||||||
|
|
||||||
|
GROUP_A_FILES = ["group_a1.xpt", "group_a2.xpt", "group_a3.xpt"]
|
||||||
|
GROUP_B_FILES = ["group_b1.xpt", "group_b2.xpt"]
|
||||||
|
STANDALONE_FILE = "standalone.xpt"
|
||||||
|
|
||||||
|
# Columns dropped from the group_b cluster so it has a genuinely different
|
||||||
|
# schema from the group_a cluster. If the regex accidentally pulled a group_b file
|
||||||
|
# into the group_a cluster (or vice versa), load_cluster's schema-compat check
|
||||||
|
# would fire on these differences.
|
||||||
|
GROUP_B_DROPPED_COLUMNS = ("BIGINT", "TIMECOL")
|
||||||
|
|
||||||
|
|
||||||
|
def _build_df(seed: int) -> pd.DataFrame:
|
||||||
|
"""Build a kitchen-sink DataFrame via the existing generator.
|
||||||
|
|
||||||
|
Temporarily shrinks ``generate_sample_sas.N_ROWS`` so each fixture file
|
||||||
|
is small enough to regenerate quickly. Restored afterward so importing
|
||||||
|
this module alongside the main generator stays side-effect free.
|
||||||
|
"""
|
||||||
|
saved = gss.N_ROWS
|
||||||
|
gss.N_ROWS = FIXTURE_ROWS
|
||||||
|
try:
|
||||||
|
rng = np.random.default_rng(seed)
|
||||||
|
return gss.build_dataframe(rng)
|
||||||
|
finally:
|
||||||
|
gss.N_ROWS = saved
|
||||||
|
|
||||||
|
|
||||||
|
def _write_xport(df: pd.DataFrame, path: Path, table_name: str) -> None:
|
||||||
|
# Only pass variable_format entries for columns that actually exist in
|
||||||
|
# this frame - write_xport errors on formats referencing missing cols.
|
||||||
|
variable_format = {
|
||||||
|
k: v for k, v in gss.VARIABLE_FORMATS.items() if k in df.columns
|
||||||
|
}
|
||||||
|
column_labels = {k: v for k, v in gss.COLUMN_LABELS.items() if k in df.columns}
|
||||||
|
|
||||||
|
pyreadstat.write_xport(
|
||||||
|
df,
|
||||||
|
str(path),
|
||||||
|
file_format_version=5,
|
||||||
|
table_name=table_name,
|
||||||
|
file_label=f"Folder-loader fixture ({path.name})",
|
||||||
|
column_labels=column_labels,
|
||||||
|
variable_format=variable_format,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_fixtures() -> None:
|
||||||
|
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
for i, name in enumerate(GROUP_A_FILES):
|
||||||
|
df = _build_df(seed=100 + i)
|
||||||
|
_write_xport(df, OUT_DIR / name, table_name=f"GRPA{i + 1}")
|
||||||
|
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
|
||||||
|
|
||||||
|
for i, name in enumerate(GROUP_B_FILES):
|
||||||
|
df = _build_df(seed=200 + i)
|
||||||
|
df = df.drop(columns=list(GROUP_B_DROPPED_COLUMNS))
|
||||||
|
_write_xport(df, OUT_DIR / name, table_name=f"GRPB{i + 1}")
|
||||||
|
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
|
||||||
|
|
||||||
|
df = _build_df(seed=300)
|
||||||
|
_write_xport(df, OUT_DIR / STANDALONE_FILE, table_name="STDALONE")
|
||||||
|
print(
|
||||||
|
f" wrote {OUT_DIR / STANDALONE_FILE} "
|
||||||
|
f"({len(df):,} rows, {len(df.columns)} cols)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def write_config() -> None:
|
||||||
|
cfg = {
|
||||||
|
"folder": ".", # config lives inside the target folder
|
||||||
|
"schemaname": "public",
|
||||||
|
"if_exists": "replace",
|
||||||
|
"auto_detect": True,
|
||||||
|
"clusters": [
|
||||||
|
{
|
||||||
|
"pattern": r"^group_a\d+\.xpt$",
|
||||||
|
"tablename": "group_a",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
with CONFIG_PATH.open("w", encoding="utf-8") as f:
|
||||||
|
# Top-of-file comment documents the intent of this generated config.
|
||||||
|
f.write(
|
||||||
|
"# Generated by generate_sample_folder.py. Demonstrates both\n"
|
||||||
|
"# explicit regex clustering (group_a*) and auto-detect\n"
|
||||||
|
"# (group_b* and standalone) working together.\n"
|
||||||
|
)
|
||||||
|
yaml.safe_dump(cfg, f, sort_keys=False)
|
||||||
|
print(f" wrote {CONFIG_PATH}")
|
||||||
|
|
||||||
|
|
||||||
|
def verify() -> None:
|
||||||
|
"""Smoke-test the clustering logic against the generated folder."""
|
||||||
|
cfg = load_folder_config(CONFIG_PATH)
|
||||||
|
clusters = discover_clusters(cfg)
|
||||||
|
|
||||||
|
by_name = {c.tablename: c for c in clusters}
|
||||||
|
|
||||||
|
expected_names = {"group_a", "group_b", "standalone"}
|
||||||
|
actual_names = set(by_name)
|
||||||
|
assert expected_names == actual_names, (
|
||||||
|
f"cluster set mismatch: expected {expected_names}, got {actual_names}"
|
||||||
|
)
|
||||||
|
|
||||||
|
group_a = by_name["group_a"]
|
||||||
|
assert group_a.source == "explicit", f"group_a source = {group_a.source!r}"
|
||||||
|
assert [f.name for f in group_a.files] == sorted(GROUP_A_FILES), (
|
||||||
|
f"group_a files = {[f.name for f in group_a.files]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
group_b = by_name["group_b"]
|
||||||
|
assert group_b.source == "auto", f"group_b source = {group_b.source!r}"
|
||||||
|
assert [f.name for f in group_b.files] == sorted(GROUP_B_FILES), (
|
||||||
|
f"group_b files = {[f.name for f in group_b.files]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
standalone = by_name["standalone"]
|
||||||
|
assert standalone.source == "auto", f"standalone source = {standalone.source!r}"
|
||||||
|
assert [f.name for f in standalone.files] == [STANDALONE_FILE], (
|
||||||
|
f"standalone files = {[f.name for f in standalone.files]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
print(" clustering verified:")
|
||||||
|
for c in clusters:
|
||||||
|
files = ", ".join(f.name for f in c.files)
|
||||||
|
print(f" {c.tablename} [{c.source}]: {files}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
print(f"Writing fixture SAS files to {OUT_DIR}/")
|
||||||
|
generate_fixtures()
|
||||||
|
print(f"\nWriting folder config to {CONFIG_PATH}")
|
||||||
|
write_config()
|
||||||
|
print("\nVerifying discover_clusters() grouping...")
|
||||||
|
verify()
|
||||||
|
print("\nOK. Try:")
|
||||||
|
print(f" python load_folder.py --config {CONFIG_PATH} --dry-run")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
575
generic_loader/load_folder.py
Normal file
575
generic_loader/load_folder.py
Normal file
@ -0,0 +1,575 @@
|
|||||||
|
"""Folder-level SAS-to-Postgres loader.
|
||||||
|
|
||||||
|
Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in
|
||||||
|
one invocation. A directory often contains several *clusters* of files that
|
||||||
|
share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each
|
||||||
|
cluster becomes one Postgres table; files inside a cluster are appended to it.
|
||||||
|
|
||||||
|
-------------------------------------------------------------------------------
|
||||||
|
USAGE
|
||||||
|
-------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
1. YAML config
|
||||||
|
--------------
|
||||||
|
::
|
||||||
|
|
||||||
|
folder: samples/folder_test # required; relative paths resolve against
|
||||||
|
# the config file's directory
|
||||||
|
schemaname: public # required
|
||||||
|
|
||||||
|
# Optional. One of: fail | replace | append. Default: fail.
|
||||||
|
# Applied to the first file of each cluster (subsequent files in the
|
||||||
|
# cluster always run through the append-mode compatibility check).
|
||||||
|
if_exists: fail
|
||||||
|
|
||||||
|
# Optional. Default: true. When true, files that don't match any explicit
|
||||||
|
# pattern below are grouped by their common prefix (trailing digits, and
|
||||||
|
# optional trailing separators, are stripped from each file stem).
|
||||||
|
auto_detect: true
|
||||||
|
|
||||||
|
# Optional. Columns to force-include or force-exclude across every file.
|
||||||
|
# include and exclude are mutually exclusive.
|
||||||
|
# include: [ID, INTCOL]
|
||||||
|
# exclude: [ALLNULL]
|
||||||
|
|
||||||
|
# Optional explicit cluster patterns. Each pattern is matched against the
|
||||||
|
# file *basename*. Matched files are pulled out of the auto-detect pool.
|
||||||
|
# Per-cluster if_exists/include/exclude override the folder-level defaults.
|
||||||
|
clusters:
|
||||||
|
- pattern: '^group_a\\d+\\.sas7bdat$'
|
||||||
|
tablename: group_a
|
||||||
|
- pattern: '^group_b\\d+\\.sas7bdat$'
|
||||||
|
tablename: group_b
|
||||||
|
if_exists: replace
|
||||||
|
|
||||||
|
2. Command-line interface
|
||||||
|
-------------------------
|
||||||
|
::
|
||||||
|
|
||||||
|
python load_folder.py --config folder_config.yaml [--dry-run] [--fail-fast]
|
||||||
|
[--dbcreds]
|
||||||
|
|
||||||
|
Flags:
|
||||||
|
--config PATH Required. Path to the YAML config above.
|
||||||
|
--dry-run Print the discovered clusters and the inferred CREATE
|
||||||
|
TABLE for each (schema from the first file of the
|
||||||
|
cluster). The database is never touched.
|
||||||
|
--fail-fast Abort the whole run on the first cluster failure.
|
||||||
|
Default is to log the failure, roll that cluster back,
|
||||||
|
and keep going.
|
||||||
|
--dbcreds Prompt interactively for the database username and
|
||||||
|
password instead of reading ``PGUSER`` / ``PGPASSWORD``
|
||||||
|
from the environment or ``.env`` file. The password
|
||||||
|
prompt does not echo. Has no effect with ``--dry-run``
|
||||||
|
(no connection is opened).
|
||||||
|
|
||||||
|
Exit codes:
|
||||||
|
0 - every cluster loaded successfully (or dry-run completed)
|
||||||
|
1 - at least one cluster failed (details on stderr)
|
||||||
|
2 - folder does not exist / contains no SAS files
|
||||||
|
|
||||||
|
3. Discovery rules
|
||||||
|
------------------
|
||||||
|
* Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches
|
||||||
|
:mod:`load_sas`). The folder is not scanned recursively.
|
||||||
|
* Explicit patterns are tried in order. A file matched by one pattern is
|
||||||
|
removed from the pool before the next pattern runs, so earlier patterns
|
||||||
|
win in case of overlap. Overlap between patterns is flagged as an error
|
||||||
|
at config-parse time (a file matching two patterns is almost always a bug).
|
||||||
|
* Auto-detect groups remaining files by ``re.sub(r'\\d+$', '', stem)`` with
|
||||||
|
any trailing ``_`` / ``-`` stripped afterward. Stems without trailing
|
||||||
|
digits become singleton clusters named after the stem.
|
||||||
|
|
||||||
|
4. Library usage
|
||||||
|
----------------
|
||||||
|
::
|
||||||
|
|
||||||
|
from load_folder import load_folder_config, discover_clusters, load_cluster
|
||||||
|
from load_sas import connect
|
||||||
|
|
||||||
|
cfg = load_folder_config("folder_config.yaml")
|
||||||
|
clusters = discover_clusters(cfg)
|
||||||
|
|
||||||
|
conn = connect()
|
||||||
|
try:
|
||||||
|
for cluster in clusters:
|
||||||
|
load_cluster(conn, cluster, cfg.schemaname)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import getpass
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from load_sas import (
|
||||||
|
VALID_IF_EXISTS,
|
||||||
|
apply_column_filter,
|
||||||
|
assert_schema_compatible,
|
||||||
|
connect,
|
||||||
|
copy_dataframes,
|
||||||
|
create_table,
|
||||||
|
infer_schema,
|
||||||
|
iter_sas_chunks,
|
||||||
|
read_sas_preview,
|
||||||
|
render_create_table,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Dataclasses
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClusterSpec:
|
||||||
|
tablename: str
|
||||||
|
files: List[Path]
|
||||||
|
if_exists: str
|
||||||
|
include: Optional[List[str]]
|
||||||
|
exclude: Optional[List[str]]
|
||||||
|
source: str # "explicit" or "auto"
|
||||||
|
pattern: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _ExplicitPattern:
|
||||||
|
"""Parsed form of a single ``clusters[*]`` YAML entry."""
|
||||||
|
|
||||||
|
pattern: re.Pattern
|
||||||
|
raw_pattern: str
|
||||||
|
tablename: str
|
||||||
|
if_exists: Optional[str] = None
|
||||||
|
include: Optional[List[str]] = None
|
||||||
|
exclude: Optional[List[str]] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FolderConfig:
|
||||||
|
folder: Path
|
||||||
|
schemaname: str
|
||||||
|
if_exists: str = "fail"
|
||||||
|
auto_detect: bool = True
|
||||||
|
include: Optional[List[str]] = None
|
||||||
|
exclude: Optional[List[str]] = None
|
||||||
|
explicit: List[_ExplicitPattern] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Config loading
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_if_exists(value: Any, where: str) -> str:
|
||||||
|
s = str(value).lower()
|
||||||
|
if s not in VALID_IF_EXISTS:
|
||||||
|
raise ValueError(
|
||||||
|
f"{where}: if_exists={value!r} is not one of {VALID_IF_EXISTS}"
|
||||||
|
)
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_columns_filter(
|
||||||
|
raw: Dict[str, Any], where: str
|
||||||
|
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
|
||||||
|
include = raw.get("include")
|
||||||
|
exclude = raw.get("exclude")
|
||||||
|
if include is not None and exclude is not None:
|
||||||
|
raise ValueError(f"{where}: 'include' and 'exclude' are mutually exclusive.")
|
||||||
|
if include is not None and not isinstance(include, list):
|
||||||
|
raise ValueError(f"{where}: 'include' must be a list of column names.")
|
||||||
|
if exclude is not None and not isinstance(exclude, list):
|
||||||
|
raise ValueError(f"{where}: 'exclude' must be a list of column names.")
|
||||||
|
include_out = [str(c) for c in include] if include is not None else None
|
||||||
|
exclude_out = [str(c) for c in exclude] if exclude is not None else None
|
||||||
|
return include_out, exclude_out
|
||||||
|
|
||||||
|
|
||||||
|
def load_folder_config(path: Path) -> FolderConfig:
|
||||||
|
"""Parse and validate the folder-level YAML config at ``path``."""
|
||||||
|
path = Path(path)
|
||||||
|
with path.open("r", encoding="utf-8") as f:
|
||||||
|
raw = yaml.safe_load(f)
|
||||||
|
|
||||||
|
if not isinstance(raw, dict):
|
||||||
|
raise ValueError(f"Config at {path} must be a YAML mapping at the top level.")
|
||||||
|
|
||||||
|
missing = [k for k in ("folder", "schemaname") if k not in raw]
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"Config {path} missing required keys: {', '.join(missing)}")
|
||||||
|
|
||||||
|
folder = Path(raw["folder"])
|
||||||
|
if not folder.is_absolute():
|
||||||
|
candidate = (path.parent / folder).resolve()
|
||||||
|
folder = candidate if candidate.exists() else folder
|
||||||
|
|
||||||
|
schemaname = str(raw["schemaname"])
|
||||||
|
if_exists = _validate_if_exists(raw.get("if_exists", "fail"), f"Config {path}")
|
||||||
|
auto_detect = bool(raw.get("auto_detect", True))
|
||||||
|
|
||||||
|
include, exclude = _parse_columns_filter(raw, f"Config {path}")
|
||||||
|
|
||||||
|
explicit: List[_ExplicitPattern] = []
|
||||||
|
clusters_raw = raw.get("clusters") or []
|
||||||
|
if not isinstance(clusters_raw, list):
|
||||||
|
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
|
||||||
|
for i, entry in enumerate(clusters_raw):
|
||||||
|
where = f"Config {path} clusters[{i}]"
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
raise ValueError(f"{where} must be a mapping.")
|
||||||
|
if "pattern" not in entry or "tablename" not in entry:
|
||||||
|
raise ValueError(f"{where} must include 'pattern' and 'tablename'.")
|
||||||
|
raw_pat = str(entry["pattern"])
|
||||||
|
try:
|
||||||
|
compiled = re.compile(raw_pat)
|
||||||
|
except re.error as e:
|
||||||
|
raise ValueError(f"{where}: invalid regex {raw_pat!r}: {e}") from e
|
||||||
|
c_if_exists = (
|
||||||
|
_validate_if_exists(entry["if_exists"], where)
|
||||||
|
if "if_exists" in entry
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
c_include, c_exclude = _parse_columns_filter(entry, where)
|
||||||
|
explicit.append(
|
||||||
|
_ExplicitPattern(
|
||||||
|
pattern=compiled,
|
||||||
|
raw_pattern=raw_pat,
|
||||||
|
tablename=str(entry["tablename"]),
|
||||||
|
if_exists=c_if_exists,
|
||||||
|
include=c_include,
|
||||||
|
exclude=c_exclude,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return FolderConfig(
|
||||||
|
folder=folder,
|
||||||
|
schemaname=schemaname,
|
||||||
|
if_exists=if_exists,
|
||||||
|
auto_detect=auto_detect,
|
||||||
|
include=include,
|
||||||
|
exclude=exclude,
|
||||||
|
explicit=explicit,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Cluster discovery
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
|
||||||
|
|
||||||
|
|
||||||
|
def _auto_prefix(stem: str) -> str:
|
||||||
|
"""Derive the cluster key for a file stem.
|
||||||
|
|
||||||
|
Strip trailing digits and any trailing separators so
|
||||||
|
``group_a1`` / ``group_a_2`` / ``group_a-3`` all land in the same
|
||||||
|
``group_a`` bucket. If nothing is stripped, the stem is its own key.
|
||||||
|
"""
|
||||||
|
stripped = _TRAILING_DIGIT_RE.sub("", stem)
|
||||||
|
stripped = stripped.rstrip("_-")
|
||||||
|
return stripped or stem
|
||||||
|
|
||||||
|
|
||||||
|
def _list_sas_files(folder: Path) -> List[Path]:
|
||||||
|
files: List[Path] = []
|
||||||
|
for p in sorted(folder.iterdir()):
|
||||||
|
if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS:
|
||||||
|
files.append(p)
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
|
||||||
|
"""Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects.
|
||||||
|
|
||||||
|
Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No
|
||||||
|
SAS file is opened here. Explicit patterns are applied first, in config
|
||||||
|
order; files matched by an earlier pattern are removed from the pool
|
||||||
|
before the next pattern runs. A file matching two patterns triggers a
|
||||||
|
hard error (that's almost always a config bug).
|
||||||
|
"""
|
||||||
|
if not cfg.folder.exists() or not cfg.folder.is_dir():
|
||||||
|
raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}")
|
||||||
|
|
||||||
|
pool = _list_sas_files(cfg.folder)
|
||||||
|
clusters: List[ClusterSpec] = []
|
||||||
|
|
||||||
|
# Detect cross-pattern overlap up front for a clearer error message.
|
||||||
|
for i, p_i in enumerate(cfg.explicit):
|
||||||
|
for j in range(i + 1, len(cfg.explicit)):
|
||||||
|
p_j = cfg.explicit[j]
|
||||||
|
for f in pool:
|
||||||
|
if p_i.pattern.search(f.name) and p_j.pattern.search(f.name):
|
||||||
|
raise ValueError(
|
||||||
|
f"File {f.name!r} matches multiple explicit patterns: "
|
||||||
|
f"{p_i.raw_pattern!r} and {p_j.raw_pattern!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
remaining = list(pool)
|
||||||
|
for patt in cfg.explicit:
|
||||||
|
matched = [f for f in remaining if patt.pattern.search(f.name)]
|
||||||
|
if not matched:
|
||||||
|
# Not an error - the folder might legitimately not contain files
|
||||||
|
# for this pattern on a given run. Emit a note for the CLI.
|
||||||
|
clusters.append(
|
||||||
|
ClusterSpec(
|
||||||
|
tablename=patt.tablename,
|
||||||
|
files=[],
|
||||||
|
if_exists=patt.if_exists or cfg.if_exists,
|
||||||
|
include=patt.include if patt.include is not None else cfg.include,
|
||||||
|
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
|
||||||
|
source="explicit",
|
||||||
|
pattern=patt.raw_pattern,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
remaining = [f for f in remaining if f not in matched]
|
||||||
|
clusters.append(
|
||||||
|
ClusterSpec(
|
||||||
|
tablename=patt.tablename,
|
||||||
|
files=sorted(matched),
|
||||||
|
if_exists=patt.if_exists or cfg.if_exists,
|
||||||
|
include=patt.include if patt.include is not None else cfg.include,
|
||||||
|
exclude=patt.exclude if patt.exclude is not None else cfg.exclude,
|
||||||
|
source="explicit",
|
||||||
|
pattern=patt.raw_pattern,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if cfg.auto_detect and remaining:
|
||||||
|
buckets: Dict[str, List[Path]] = {}
|
||||||
|
for f in remaining:
|
||||||
|
key = _auto_prefix(f.stem)
|
||||||
|
buckets.setdefault(key, []).append(f)
|
||||||
|
for key in sorted(buckets):
|
||||||
|
clusters.append(
|
||||||
|
ClusterSpec(
|
||||||
|
tablename=key,
|
||||||
|
files=sorted(buckets[key]),
|
||||||
|
if_exists=cfg.if_exists,
|
||||||
|
include=cfg.include,
|
||||||
|
exclude=cfg.exclude,
|
||||||
|
source="auto",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return clusters
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-cluster load
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _infer_cluster_schema(path: Path, include, exclude):
|
||||||
|
preview_df, meta = read_sas_preview(path)
|
||||||
|
preview_df = apply_column_filter(preview_df, include, exclude)
|
||||||
|
total_rows = getattr(meta, "number_rows", None)
|
||||||
|
columns = infer_schema(preview_df, meta, total_rows=total_rows)
|
||||||
|
return columns
|
||||||
|
|
||||||
|
|
||||||
|
def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int:
|
||||||
|
"""Load every file in ``cluster`` into one table. Returns total rows loaded.
|
||||||
|
|
||||||
|
The caller owns transaction boundaries. This function does NOT commit or
|
||||||
|
roll back - :func:`main` does that per cluster so one bad cluster
|
||||||
|
doesn't poison the rest of the run.
|
||||||
|
"""
|
||||||
|
if not cluster.files:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
first, *rest = cluster.files
|
||||||
|
first_columns = _infer_cluster_schema(first, cluster.include, cluster.exclude)
|
||||||
|
create_table(
|
||||||
|
conn, schemaname, cluster.tablename, first_columns, cluster.if_exists
|
||||||
|
)
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
total += _stream_file(
|
||||||
|
conn, schemaname, cluster.tablename, first, first_columns,
|
||||||
|
cluster.include, cluster.exclude,
|
||||||
|
)
|
||||||
|
|
||||||
|
for path in rest:
|
||||||
|
columns = _infer_cluster_schema(path, cluster.include, cluster.exclude)
|
||||||
|
# Uses the same check that if_exists=append runs. A type mismatch or
|
||||||
|
# missing column aborts the cluster; the transaction rollback in
|
||||||
|
# main() keeps the table from ending up half-loaded.
|
||||||
|
assert_schema_compatible(conn, schemaname, cluster.tablename, columns)
|
||||||
|
total += _stream_file(
|
||||||
|
conn, schemaname, cluster.tablename, path, columns,
|
||||||
|
cluster.include, cluster.exclude,
|
||||||
|
)
|
||||||
|
|
||||||
|
return total
|
||||||
|
|
||||||
|
|
||||||
|
def _stream_file(
|
||||||
|
conn,
|
||||||
|
schemaname: str,
|
||||||
|
tablename: str,
|
||||||
|
path: Path,
|
||||||
|
columns,
|
||||||
|
include,
|
||||||
|
exclude,
|
||||||
|
) -> int:
|
||||||
|
def _chunks():
|
||||||
|
seen = 0
|
||||||
|
for chunk_df, _chunk_meta in iter_sas_chunks(path):
|
||||||
|
chunk_df = apply_column_filter(chunk_df, include, exclude)
|
||||||
|
seen += len(chunk_df)
|
||||||
|
print(
|
||||||
|
f" {path.name}: streaming... {seen:,} rows",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
yield chunk_df
|
||||||
|
|
||||||
|
return copy_dataframes(conn, schemaname, tablename, _chunks(), columns)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CLI
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _build_argparser() -> argparse.ArgumentParser:
|
||||||
|
p = argparse.ArgumentParser(
|
||||||
|
description=(
|
||||||
|
"Load every SAS file in a folder into Postgres, grouping files "
|
||||||
|
"into clusters that each become one table."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument("--config", required=True, type=Path, help="Path to YAML config")
|
||||||
|
p.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Print discovered clusters and the inferred CREATE TABLE for "
|
||||||
|
"each; don't touch Postgres."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--fail-fast",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Abort on the first cluster failure. Default is to roll that "
|
||||||
|
"cluster back and continue with the next one."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--dbcreds",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Prompt for database username and password instead of reading "
|
||||||
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def _describe_cluster(cluster: ClusterSpec) -> str:
|
||||||
|
src = f"{cluster.source}"
|
||||||
|
if cluster.pattern:
|
||||||
|
src += f" pattern={cluster.pattern!r}"
|
||||||
|
files = ", ".join(f.name for f in cluster.files) or "(no matching files)"
|
||||||
|
return (
|
||||||
|
f"cluster {cluster.tablename!r} [{src}] if_exists={cluster.if_exists}\n"
|
||||||
|
f" files: {files}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: Optional[List[str]] = None) -> int:
|
||||||
|
args = _build_argparser().parse_args(argv)
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
cfg = load_folder_config(args.config)
|
||||||
|
|
||||||
|
if not cfg.folder.exists() or not cfg.folder.is_dir():
|
||||||
|
print(f"error: folder not found: {cfg.folder}", file=sys.stderr)
|
||||||
|
return 2
|
||||||
|
|
||||||
|
clusters = discover_clusters(cfg)
|
||||||
|
loadable = [c for c in clusters if c.files]
|
||||||
|
|
||||||
|
if not loadable:
|
||||||
|
print(
|
||||||
|
f"error: no SAS files found in {cfg.folder} "
|
||||||
|
f"(looked for {', '.join(SAS_EXTENSIONS)})",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
return 2
|
||||||
|
|
||||||
|
print(f"discovered {len(loadable)} cluster(s) in {cfg.folder}:")
|
||||||
|
for c in clusters:
|
||||||
|
print(_describe_cluster(c))
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
print()
|
||||||
|
for c in loadable:
|
||||||
|
print(f"--- CREATE TABLE for cluster {c.tablename!r} ---")
|
||||||
|
columns = _infer_cluster_schema(c.files[0], c.include, c.exclude)
|
||||||
|
print(render_create_table(cfg.schemaname, c.tablename, columns))
|
||||||
|
print()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
db_user = db_password = None
|
||||||
|
if args.dbcreds:
|
||||||
|
db_user = input("Database username: ")
|
||||||
|
db_password = getpass.getpass("Database password: ")
|
||||||
|
|
||||||
|
conn = connect(user=db_user, password=db_password)
|
||||||
|
conn.autocommit = False
|
||||||
|
failures: List[Tuple[str, Exception]] = []
|
||||||
|
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
|
||||||
|
try:
|
||||||
|
for cluster in loadable:
|
||||||
|
print(
|
||||||
|
f"\n>>> loading cluster {cluster.tablename!r} "
|
||||||
|
f"({len(cluster.files)} file(s))"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
rows = load_cluster(conn, cluster, cfg.schemaname)
|
||||||
|
conn.commit()
|
||||||
|
totals.append((cluster.tablename, len(cluster.files), rows))
|
||||||
|
print(
|
||||||
|
f" -> loaded {rows:,} row(s) into "
|
||||||
|
f"{cfg.schemaname}.{cluster.tablename}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
conn.rollback()
|
||||||
|
failures.append((cluster.tablename, e))
|
||||||
|
print(
|
||||||
|
f" !! cluster {cluster.tablename!r} failed: {e}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
if args.fail_fast:
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
print("\n=== summary ===")
|
||||||
|
for name, fcount, rows in totals:
|
||||||
|
print(f" ok {name}: {fcount} file(s), {rows:,} row(s)")
|
||||||
|
for name, err in failures:
|
||||||
|
print(f" FAIL {name}: {err}", file=sys.stderr)
|
||||||
|
|
||||||
|
return 1 if failures else 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
@ -8,19 +8,222 @@ Python 3.9 compatible (target is an air-gapped host that currently only has
|
|||||||
3.9). ``from __future__ import annotations`` lets us use PEP 585 generics
|
3.9). ``from __future__ import annotations`` lets us use PEP 585 generics
|
||||||
as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick
|
as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick
|
||||||
to ``typing``.
|
to ``typing``.
|
||||||
|
|
||||||
|
-------------------------------------------------------------------------------
|
||||||
|
USAGE
|
||||||
|
-------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
Supported inputs:
|
||||||
|
* ``.sas7bdat`` (read with ``encoding="latin-1"``)
|
||||||
|
* ``.xpt`` / ``.xport`` (SAS transport files)
|
||||||
|
|
||||||
|
1. YAML config
|
||||||
|
--------------
|
||||||
|
Every invocation is driven by a YAML file describing one SAS file to load::
|
||||||
|
|
||||||
|
filename: samples/sample_kitchensink.xpt # required; relative paths are
|
||||||
|
# resolved against the config
|
||||||
|
# file's directory when possible
|
||||||
|
schemaname: public # required
|
||||||
|
tablename: kitchensink # required
|
||||||
|
|
||||||
|
# Optional. One of: fail | replace | append. Default: fail.
|
||||||
|
# fail - error out if the target table already exists
|
||||||
|
# replace - DROP and recreate the table from the inferred schema
|
||||||
|
# append - keep the existing table; pre-flight a schema-compat check,
|
||||||
|
# then COPY the new rows in
|
||||||
|
if_exists: append
|
||||||
|
|
||||||
|
# Optional, mutually exclusive. Restrict which columns are loaded.
|
||||||
|
# include:
|
||||||
|
# - ID
|
||||||
|
# - INTCOL
|
||||||
|
# exclude:
|
||||||
|
# - ALLNULL
|
||||||
|
|
||||||
|
2. Database connection
|
||||||
|
----------------------
|
||||||
|
The loader uses standard libpq environment variables (read via ``os.environ``)::
|
||||||
|
|
||||||
|
PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE
|
||||||
|
|
||||||
|
The CLI calls ``python-dotenv``'s ``load_dotenv()`` at startup, so a local
|
||||||
|
``.env`` file is picked up automatically. Library callers are responsible for
|
||||||
|
populating the environment themselves (either call ``load_dotenv()`` or export
|
||||||
|
the vars) before calling :func:`connect`.
|
||||||
|
|
||||||
|
3. Command-line interface
|
||||||
|
-------------------------
|
||||||
|
::
|
||||||
|
|
||||||
|
python load_sas.py --config path/to/config.yaml [--validate] [--dry-run]
|
||||||
|
[--dbcreds]
|
||||||
|
|
||||||
|
Flags:
|
||||||
|
--config PATH Required. Path to the YAML config above.
|
||||||
|
--validate Compare the inferred schema against
|
||||||
|
``<sas-file-stem>.expected.json`` sitting next to the SAS
|
||||||
|
file. Exits nonzero on mismatch. Safe to combine with
|
||||||
|
``--dry-run``.
|
||||||
|
--dry-run Print the inferred ``CREATE TABLE`` SQL and stop. The
|
||||||
|
database is never touched (no connection is opened).
|
||||||
|
--dbcreds Prompt interactively for the database username and
|
||||||
|
password instead of reading ``PGUSER`` / ``PGPASSWORD``
|
||||||
|
from the environment or ``.env`` file. The password
|
||||||
|
prompt does not echo. Has no effect with ``--dry-run``
|
||||||
|
(no connection is opened).
|
||||||
|
|
||||||
|
Exit codes:
|
||||||
|
0 - success (load completed, or dry-run/validate passed)
|
||||||
|
1 - validation failure
|
||||||
|
2 - config references a SAS file that does not exist
|
||||||
|
Other nonzero - uncaught exception (traceback printed); the transaction
|
||||||
|
is rolled back before exit.
|
||||||
|
|
||||||
|
Typical invocations::
|
||||||
|
|
||||||
|
# Preview the inferred schema without connecting to Postgres.
|
||||||
|
python load_sas.py --config sample_config.yaml --dry-run
|
||||||
|
|
||||||
|
# Check the inferred schema against an expected-types manifest.
|
||||||
|
python load_sas.py --config sample_config.yaml --validate --dry-run
|
||||||
|
|
||||||
|
# Actually load the data.
|
||||||
|
python load_sas.py --config sample_config.yaml
|
||||||
|
|
||||||
|
# Load the data, prompting for credentials instead of using .env.
|
||||||
|
python load_sas.py --config sample_config.yaml --dbcreds
|
||||||
|
|
||||||
|
4. Expected-types manifest (``--validate``)
|
||||||
|
-------------------------------------------
|
||||||
|
``--validate`` looks for a JSON file named ``<sas-stem>.expected.json`` next
|
||||||
|
to the SAS file, e.g. ``samples/sample_kitchensink.xpt`` pairs with
|
||||||
|
``samples/sample_kitchensink.expected.json``. Each top-level key is a column
|
||||||
|
name; the value is an object with any of::
|
||||||
|
|
||||||
|
{
|
||||||
|
"postgres_type": "BIGINT", # exact expected type, OR
|
||||||
|
"acceptable_types": ["TEXT", # any-of list of acceptable types
|
||||||
|
"VARCHAR"],
|
||||||
|
"nullable": true, # default true; false = must be NOT NULL
|
||||||
|
"note": "free-form comment" # ignored by the loader
|
||||||
|
}
|
||||||
|
|
||||||
|
Type comparison ignores length/precision modifiers and normalizes synonyms
|
||||||
|
(e.g. ``INT`` == ``INTEGER`` == ``INT4``; ``VARCHAR(10)`` == ``VARCHAR``).
|
||||||
|
Nullability tightening (inferred NULL, manifest NOT NULL) is a hard failure;
|
||||||
|
loosening is not checked here because the append-mode check already covers it.
|
||||||
|
|
||||||
|
5. Library usage
|
||||||
|
----------------
|
||||||
|
The CLI is a thin wrapper around composable functions. The preferred pattern
|
||||||
|
infers the schema from a bounded preview and then streams the rest of the
|
||||||
|
file chunk-by-chunk into ``COPY`` - crucial for SAS files with hundreds of
|
||||||
|
millions of rows::
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from load_sas import (
|
||||||
|
load_config, read_sas_preview, iter_sas_chunks, apply_column_filter,
|
||||||
|
infer_schema, validate_against_manifest, render_create_table,
|
||||||
|
connect, create_table, copy_dataframes,
|
||||||
|
)
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
cfg = load_config("config.yaml")
|
||||||
|
|
||||||
|
# Schema from a preview slice (bounded by TYPE_INFERENCE_SAMPLE_ROWS).
|
||||||
|
preview_df, meta = read_sas_preview(cfg.filename)
|
||||||
|
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
|
||||||
|
total_rows = getattr(meta, "number_rows", None)
|
||||||
|
columns = infer_schema(preview_df, meta, total_rows=total_rows)
|
||||||
|
|
||||||
|
# Optional: preview DDL / validate against a manifest.
|
||||||
|
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
|
||||||
|
problems = validate_against_manifest(columns, Path("expected.json"))
|
||||||
|
assert not problems, problems
|
||||||
|
|
||||||
|
conn = connect()
|
||||||
|
conn.autocommit = False
|
||||||
|
try:
|
||||||
|
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
||||||
|
chunks = (
|
||||||
|
apply_column_filter(df, cfg.include, cfg.exclude)
|
||||||
|
for df, _ in iter_sas_chunks(cfg.filename)
|
||||||
|
)
|
||||||
|
rows = copy_dataframes(conn, cfg.schemaname, cfg.tablename, chunks, columns)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
For small files (or tests) the legacy one-shot API still works:
|
||||||
|
:func:`read_sas` returns the whole frame and :func:`copy_dataframe` copies it
|
||||||
|
in one round trip.
|
||||||
|
|
||||||
|
All functions are side-effect free except :func:`connect`, :func:`create_table`,
|
||||||
|
:func:`copy_dataframe`, and :func:`copy_dataframes`; schema inference
|
||||||
|
(:func:`infer_schema`) accepts a ``coerce_chars`` kwarg to override the
|
||||||
|
module-level ``COERCE_CHAR_COLUMNS`` without mutating global state.
|
||||||
|
|
||||||
|
6. Type inference summary
|
||||||
|
-------------------------
|
||||||
|
Priority order used by :func:`infer_schema`:
|
||||||
|
|
||||||
|
1. SAS format string (via ``meta.original_variable_types``):
|
||||||
|
``DATETIME*`` -> ``TIMESTAMP``, ``TIME*`` -> ``TIME``,
|
||||||
|
``DATE*`` / ``YYMMDD*`` / ``MMDDYY*`` / ``DDMMYY*`` / ``JULIAN*`` -> ``DATE``.
|
||||||
|
2. All-null column -> ``TEXT`` (with a note).
|
||||||
|
3. pandas datetime dtype -> ``TIMESTAMP``.
|
||||||
|
4. Object columns containing only ``datetime.date`` / ``datetime.datetime``
|
||||||
|
-> ``DATE`` or ``TIMESTAMP``.
|
||||||
|
5. Object columns of strings: if ``COERCE_CHAR_COLUMNS`` is True and at
|
||||||
|
least ``CHAR_INFERENCE_MIN_VALUES`` non-empty values parse cleanly, they
|
||||||
|
are promoted to ``INTEGER`` / ``BIGINT`` / ``DOUBLE PRECISION`` /
|
||||||
|
``DATE`` / ``TIMESTAMP``; otherwise ``TEXT``.
|
||||||
|
6. Numeric columns of whole numbers -> ``INTEGER`` (or ``BIGINT`` if any
|
||||||
|
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
|
||||||
|
``DOUBLE PRECISION``.
|
||||||
|
|
||||||
|
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
|
||||||
|
performance on large files. The CLI enforces this at read time via
|
||||||
|
:func:`read_sas_preview`, so the whole file is never materialized just to pick
|
||||||
|
types. Sampled specs carry an ``inferred_from_sample`` marker and the usual
|
||||||
|
tradeoffs: if the first N rows fit ``INTEGER`` but a later row exceeds int32,
|
||||||
|
or a column had no nulls in the preview but does later in the file, ``COPY``
|
||||||
|
will fail mid-stream and the whole transaction rolls back. Set
|
||||||
|
``TYPE_INFERENCE_SAMPLE_ROWS = None`` to scan every row when exact typing
|
||||||
|
matters more than speed.
|
||||||
|
|
||||||
|
Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which
|
||||||
|
share one cursor and transaction so a failure mid-file rolls back the whole
|
||||||
|
load.
|
||||||
|
|
||||||
|
7. Tunables
|
||||||
|
-----------
|
||||||
|
Module-level knobs at the top of this file:
|
||||||
|
|
||||||
|
* ``COERCE_CHAR_COLUMNS`` - promote stringly-typed numerics / dates
|
||||||
|
(default True).
|
||||||
|
* ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before
|
||||||
|
char-column coercion is attempted.
|
||||||
|
* ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become
|
||||||
|
``BIGINT``.
|
||||||
|
* ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows read for type inference
|
||||||
|
(``None`` = scan the whole column).
|
||||||
|
* ``DEFAULT_CHUNK_ROWS`` - rows per streaming COPY chunk.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import getpass
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import psycopg2
|
import psycopg2
|
||||||
@ -45,6 +248,18 @@ values; too small a sample is easy to mis-infer."""
|
|||||||
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
||||||
"""INTEGER bounds; anything outside becomes BIGINT."""
|
"""INTEGER bounds; anything outside becomes BIGINT."""
|
||||||
|
|
||||||
|
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
|
||||||
|
"""Cap on rows inspected during per-column type inference. Also governs how
|
||||||
|
many rows :func:`read_sas_preview` pulls from the file for dry-run / validate /
|
||||||
|
schema-inference flows. Set to ``None`` to scan every row (and read the whole
|
||||||
|
file into memory for the preview step - don't do this on multi-hundred-million
|
||||||
|
row files)."""
|
||||||
|
|
||||||
|
DEFAULT_CHUNK_ROWS = 100_000
|
||||||
|
"""Rows per chunk when streaming a SAS file into ``COPY``. Larger values mean
|
||||||
|
fewer COPY round-trips but more peak memory per chunk; smaller values are
|
||||||
|
gentler on memory."""
|
||||||
|
|
||||||
|
|
||||||
VALID_IF_EXISTS = ("fail", "replace", "append")
|
VALID_IF_EXISTS = ("fail", "replace", "append")
|
||||||
|
|
||||||
@ -72,6 +287,12 @@ class ColumnSpec:
|
|||||||
sas_format: Optional[str] = None
|
sas_format: Optional[str] = None
|
||||||
source_dtype: Optional[str] = None
|
source_dtype: Optional[str] = None
|
||||||
notes: List[str] = field(default_factory=list)
|
notes: List[str] = field(default_factory=list)
|
||||||
|
sampled: bool = False
|
||||||
|
"""True when the type was inferred from a bounded preview rather than the
|
||||||
|
full file. A sampled spec carries the usual sampling risks: a later chunk
|
||||||
|
could contain a value that exceeds the inferred integer range, doesn't
|
||||||
|
parse as the inferred type, or is null in a column the preview showed as
|
||||||
|
non-null - all of which surface as mid-``COPY`` failures."""
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -97,18 +318,25 @@ class ValidationError(RuntimeError):
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def connect() -> psycopg2.extensions.connection:
|
def connect(
|
||||||
|
*,
|
||||||
|
user: Optional[str] = None,
|
||||||
|
password: Optional[str] = None,
|
||||||
|
) -> psycopg2.extensions.connection:
|
||||||
"""Open a psycopg2 connection using standard libpq env vars.
|
"""Open a psycopg2 connection using standard libpq env vars.
|
||||||
|
|
||||||
Assumes `.env` has already been loaded (the CLI does this before calling).
|
Assumes `.env` has already been loaded (the CLI does this before calling).
|
||||||
Orchestrators that wrap this module should either call ``load_dotenv()``
|
Orchestrators that wrap this module should either call ``load_dotenv()``
|
||||||
themselves or ensure the env vars are set.
|
themselves or ensure the env vars are set.
|
||||||
|
|
||||||
|
``user`` and ``password`` override the corresponding env vars when supplied
|
||||||
|
(used by the ``--dbcreds`` CLI flag to accept interactive input).
|
||||||
"""
|
"""
|
||||||
conn = psycopg2.connect(
|
conn = psycopg2.connect(
|
||||||
host=os.environ.get("PGHOST"),
|
host=os.environ.get("PGHOST"),
|
||||||
port=os.environ.get("PGPORT"),
|
port=os.environ.get("PGPORT"),
|
||||||
user=os.environ.get("PGUSER"),
|
user=user or os.environ.get("PGUSER"),
|
||||||
password=os.environ.get("PGPASSWORD"),
|
password=password or os.environ.get("PGPASSWORD"),
|
||||||
dbname=os.environ.get("PGDATABASE"),
|
dbname=os.environ.get("PGDATABASE"),
|
||||||
)
|
)
|
||||||
return conn
|
return conn
|
||||||
@ -171,8 +399,8 @@ def load_config(path: Path) -> LoaderConfig:
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
|
def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]:
|
||||||
"""Dispatch to the right pyreadstat reader by extension.
|
"""Return ``(pyreadstat_reader, extra_kwargs)`` for ``path``.
|
||||||
|
|
||||||
Invariants (learned the hard way while building the sample generator):
|
Invariants (learned the hard way while building the sample generator):
|
||||||
|
|
||||||
@ -180,15 +408,58 @@ def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
|
|||||||
encoding on XPORT files it wrote itself.
|
encoding on XPORT files it wrote itself.
|
||||||
* ``.sas7bdat`` - explicit ``encoding="latin-1"`` per colleague guidance.
|
* ``.sas7bdat`` - explicit ``encoding="latin-1"`` per colleague guidance.
|
||||||
"""
|
"""
|
||||||
path = Path(path)
|
suffix = Path(path).suffix.lower()
|
||||||
suffix = path.suffix.lower()
|
|
||||||
if suffix in (".xpt", ".xport"):
|
if suffix in (".xpt", ".xport"):
|
||||||
return pyreadstat.read_xport(str(path))
|
return pyreadstat.read_xport, {}
|
||||||
if suffix == ".sas7bdat":
|
if suffix == ".sas7bdat":
|
||||||
return pyreadstat.read_sas7bdat(str(path), encoding="latin-1")
|
return pyreadstat.read_sas7bdat, {}
|
||||||
raise ValueError(f"Unsupported SAS file extension: {suffix}")
|
raise ValueError(f"Unsupported SAS file extension: {suffix}")
|
||||||
|
|
||||||
|
|
||||||
|
def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]:
|
||||||
|
"""Read an entire SAS file into memory. Only safe for small files.
|
||||||
|
|
||||||
|
Kept for backward compatibility and tests; the CLI now uses
|
||||||
|
:func:`read_sas_preview` + :func:`iter_sas_chunks` so it never materializes
|
||||||
|
the whole frame at once.
|
||||||
|
"""
|
||||||
|
reader, kwargs = _sas_reader(path)
|
||||||
|
return reader(str(Path(path)), **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def read_sas_preview(
|
||||||
|
path: Path,
|
||||||
|
*,
|
||||||
|
rows: Optional[int] = None,
|
||||||
|
) -> Tuple[pd.DataFrame, Any]:
|
||||||
|
"""Read the first ``rows`` records from ``path`` plus its metadata.
|
||||||
|
|
||||||
|
Defaults to ``TYPE_INFERENCE_SAMPLE_ROWS`` when ``rows`` is not given.
|
||||||
|
Passing ``rows=None`` with ``TYPE_INFERENCE_SAMPLE_ROWS=None`` reads the
|
||||||
|
whole file (pyreadstat treats ``row_limit=0`` as unlimited).
|
||||||
|
"""
|
||||||
|
reader, kwargs = _sas_reader(path)
|
||||||
|
effective = rows if rows is not None else TYPE_INFERENCE_SAMPLE_ROWS
|
||||||
|
row_limit = int(effective) if effective else 0
|
||||||
|
return reader(str(Path(path)), row_limit=row_limit, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def iter_sas_chunks(
|
||||||
|
path: Path,
|
||||||
|
*,
|
||||||
|
chunksize: int = DEFAULT_CHUNK_ROWS,
|
||||||
|
):
|
||||||
|
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
|
||||||
|
|
||||||
|
Thin wrapper over ``pyreadstat.read_file_in_chunks`` that picks the right
|
||||||
|
underlying reader by extension and threads through our encoding defaults.
|
||||||
|
"""
|
||||||
|
reader, kwargs = _sas_reader(path)
|
||||||
|
yield from pyreadstat.read_file_in_chunks(
|
||||||
|
reader, str(Path(path)), chunksize=chunksize, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Column filtering
|
# Column filtering
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -368,6 +639,7 @@ def infer_schema(
|
|||||||
meta: Any,
|
meta: Any,
|
||||||
*,
|
*,
|
||||||
coerce_chars: bool = COERCE_CHAR_COLUMNS,
|
coerce_chars: bool = COERCE_CHAR_COLUMNS,
|
||||||
|
total_rows: Optional[int] = None,
|
||||||
) -> Dict[str, ColumnSpec]:
|
) -> Dict[str, ColumnSpec]:
|
||||||
"""Infer a Postgres column spec for each column in ``df``.
|
"""Infer a Postgres column spec for each column in ``df``.
|
||||||
|
|
||||||
@ -379,9 +651,25 @@ def infer_schema(
|
|||||||
``COERCE_CHAR_COLUMNS`` without mutating global state. Internally the
|
``COERCE_CHAR_COLUMNS`` without mutating global state. Internally the
|
||||||
char-inference helpers still read the constant - a full override would
|
char-inference helpers still read the constant - a full override would
|
||||||
thread the flag through, but the one-knob story here is intentional.
|
thread the flag through, but the one-knob story here is intentional.
|
||||||
|
|
||||||
|
``total_rows`` lets callers who already sampled the frame (e.g. via
|
||||||
|
:func:`read_sas_preview`) report the real file size in the per-column
|
||||||
|
"inferred from first N of M rows" note. Falls back to ``len(df)``.
|
||||||
"""
|
"""
|
||||||
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
||||||
|
|
||||||
|
# Row-walking type probes run on a bounded head slice; nullability and the
|
||||||
|
# all-null check still see every row so NOT NULL declarations stay honest.
|
||||||
|
df_rows = len(df)
|
||||||
|
effective_total = total_rows if total_rows is not None else df_rows
|
||||||
|
if TYPE_INFERENCE_SAMPLE_ROWS is not None and df_rows > TYPE_INFERENCE_SAMPLE_ROWS:
|
||||||
|
sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS)
|
||||||
|
sample_size = TYPE_INFERENCE_SAMPLE_ROWS
|
||||||
|
else:
|
||||||
|
sample_df = df
|
||||||
|
sample_size = df_rows
|
||||||
|
sampled = sample_size < effective_total
|
||||||
|
|
||||||
# Temporarily flip the module-level flag if the caller asked us to.
|
# Temporarily flip the module-level flag if the caller asked us to.
|
||||||
global COERCE_CHAR_COLUMNS
|
global COERCE_CHAR_COLUMNS
|
||||||
saved = COERCE_CHAR_COLUMNS
|
saved = COERCE_CHAR_COLUMNS
|
||||||
@ -390,6 +678,7 @@ def infer_schema(
|
|||||||
out: Dict[str, ColumnSpec] = {}
|
out: Dict[str, ColumnSpec] = {}
|
||||||
for col in df.columns:
|
for col in df.columns:
|
||||||
series = df[col]
|
series = df[col]
|
||||||
|
sample_series = sample_df[col]
|
||||||
sas_format = original_formats.get(col)
|
sas_format = original_formats.get(col)
|
||||||
notes: List[str] = []
|
notes: List[str] = []
|
||||||
|
|
||||||
@ -402,13 +691,13 @@ def infer_schema(
|
|||||||
elif pd.api.types.is_datetime64_any_dtype(series):
|
elif pd.api.types.is_datetime64_any_dtype(series):
|
||||||
pg_type = "TIMESTAMP"
|
pg_type = "TIMESTAMP"
|
||||||
elif pd.api.types.is_object_dtype(series):
|
elif pd.api.types.is_object_dtype(series):
|
||||||
is_dates, any_dt = _object_is_dates(series)
|
is_dates, any_dt = _object_is_dates(sample_series)
|
||||||
if is_dates:
|
if is_dates:
|
||||||
pg_type = "TIMESTAMP" if any_dt else "DATE"
|
pg_type = "TIMESTAMP" if any_dt else "DATE"
|
||||||
else:
|
else:
|
||||||
pg_type = _infer_char_type(series)
|
pg_type = _infer_char_type(sample_series)
|
||||||
elif pd.api.types.is_numeric_dtype(series):
|
elif pd.api.types.is_numeric_dtype(series):
|
||||||
int_target = _numeric_int_target(series)
|
int_target = _numeric_int_target(sample_series)
|
||||||
if int_target is not None:
|
if int_target is not None:
|
||||||
pg_type = int_target
|
pg_type = int_target
|
||||||
else:
|
else:
|
||||||
@ -417,6 +706,12 @@ def infer_schema(
|
|||||||
pg_type = "TEXT"
|
pg_type = "TEXT"
|
||||||
notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT")
|
notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT")
|
||||||
|
|
||||||
|
if sampled:
|
||||||
|
notes.append(
|
||||||
|
f"type inferred from first {sample_size:,} of "
|
||||||
|
f"{effective_total:,} rows"
|
||||||
|
)
|
||||||
|
|
||||||
nullable = _is_nullable(series)
|
nullable = _is_nullable(series)
|
||||||
|
|
||||||
out[col] = ColumnSpec(
|
out[col] = ColumnSpec(
|
||||||
@ -426,6 +721,7 @@ def infer_schema(
|
|||||||
sas_format=sas_format,
|
sas_format=sas_format,
|
||||||
source_dtype=str(series.dtype),
|
source_dtype=str(series.dtype),
|
||||||
notes=notes,
|
notes=notes,
|
||||||
|
sampled=sampled,
|
||||||
)
|
)
|
||||||
return out
|
return out
|
||||||
finally:
|
finally:
|
||||||
@ -575,6 +871,22 @@ def _assert_schema_compatible(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def assert_schema_compatible(
|
||||||
|
conn,
|
||||||
|
schema_name: str,
|
||||||
|
table_name: str,
|
||||||
|
columns: Dict[str, ColumnSpec],
|
||||||
|
) -> None:
|
||||||
|
"""Public wrapper around :func:`_assert_schema_compatible`.
|
||||||
|
|
||||||
|
Intended for orchestrators (e.g. the folder loader) that append multiple
|
||||||
|
files into one table and need to re-run the same compatibility check
|
||||||
|
that ``if_exists=append`` performs internally. Raises
|
||||||
|
:class:`SchemaCompatibilityError` on mismatch.
|
||||||
|
"""
|
||||||
|
_assert_schema_compatible(conn, schema_name, table_name, columns)
|
||||||
|
|
||||||
|
|
||||||
def create_table(
|
def create_table(
|
||||||
conn,
|
conn,
|
||||||
schema_name: str,
|
schema_name: str,
|
||||||
@ -713,19 +1025,31 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
def copy_dataframe(
|
def copy_dataframes(
|
||||||
conn,
|
conn,
|
||||||
schema_name: str,
|
schema_name: str,
|
||||||
table_name: str,
|
table_name: str,
|
||||||
df: pd.DataFrame,
|
dfs: Iterable[pd.DataFrame],
|
||||||
columns: Dict[str, ColumnSpec],
|
columns: Dict[str, ColumnSpec],
|
||||||
) -> int:
|
) -> int:
|
||||||
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
|
"""Stream an iterable of DataFrames into one ``COPY`` session.
|
||||||
|
|
||||||
Returns the number of rows inserted.
|
All chunks share a cursor and transaction, so a failure mid-stream
|
||||||
|
rolls back the whole load when the caller hasn't committed yet.
|
||||||
|
Empty chunks are skipped. Returns the total rows inserted.
|
||||||
"""
|
"""
|
||||||
prepared = _prepare_for_copy(df, columns)
|
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
|
||||||
|
sql = (
|
||||||
|
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
|
||||||
|
f"FROM STDIN WITH (FORMAT csv, NULL '')"
|
||||||
|
)
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
for df in dfs:
|
||||||
|
if df.empty:
|
||||||
|
continue
|
||||||
|
prepared = _prepare_for_copy(df, columns)
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
prepared.to_csv(
|
prepared.to_csv(
|
||||||
buf,
|
buf,
|
||||||
@ -735,18 +1059,24 @@ def copy_dataframe(
|
|||||||
date_format="%Y-%m-%d %H:%M:%S",
|
date_format="%Y-%m-%d %H:%M:%S",
|
||||||
)
|
)
|
||||||
buf.seek(0)
|
buf.seek(0)
|
||||||
|
|
||||||
col_list = ", ".join(_quote_ident(name) for name in columns.keys())
|
|
||||||
sql = (
|
|
||||||
f"COPY {_qualified(schema_name, table_name)} ({col_list}) "
|
|
||||||
f"FROM STDIN WITH (FORMAT csv, NULL '')"
|
|
||||||
)
|
|
||||||
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.copy_expert(sql, buf)
|
cur.copy_expert(sql, buf)
|
||||||
rowcount = cur.rowcount
|
total += len(prepared)
|
||||||
|
return total
|
||||||
|
|
||||||
return int(rowcount) if rowcount is not None else len(prepared)
|
|
||||||
|
def copy_dataframe(
|
||||||
|
conn,
|
||||||
|
schema_name: str,
|
||||||
|
table_name: str,
|
||||||
|
df: pd.DataFrame,
|
||||||
|
columns: Dict[str, ColumnSpec],
|
||||||
|
) -> int:
|
||||||
|
"""Stream ``df`` into Postgres via ``COPY ... FROM STDIN``.
|
||||||
|
|
||||||
|
Convenience wrapper around :func:`copy_dataframes` for single-frame
|
||||||
|
callers. Returns the number of rows inserted.
|
||||||
|
"""
|
||||||
|
return copy_dataframes(conn, schema_name, table_name, [df], columns)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -837,6 +1167,14 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Print inferred CREATE TABLE and stop; don't touch Postgres.",
|
help="Print inferred CREATE TABLE and stop; don't touch Postgres.",
|
||||||
)
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--dbcreds",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Prompt for database username and password instead of reading "
|
||||||
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
|
),
|
||||||
|
)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
@ -859,9 +1197,13 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
|
print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr)
|
||||||
return 2
|
return 2
|
||||||
|
|
||||||
df, meta = read_sas(cfg.filename)
|
# Schema inference uses a bounded preview read so we never load a
|
||||||
df = apply_column_filter(df, cfg.include, cfg.exclude)
|
# hundreds-of-millions-of-rows file into memory just to pick types.
|
||||||
columns = infer_schema(df, meta)
|
# NB: ``meta.number_rows`` on a ``row_limit``-ed read reflects rows
|
||||||
|
# returned, not the file's total, so we don't trust it here.
|
||||||
|
preview_df, meta = read_sas_preview(cfg.filename)
|
||||||
|
preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude)
|
||||||
|
columns = infer_schema(preview_df, meta)
|
||||||
|
|
||||||
if args.validate:
|
if args.validate:
|
||||||
manifest_path = cfg.filename.with_suffix("").with_suffix(".expected.json")
|
manifest_path = cfg.filename.with_suffix("").with_suffix(".expected.json")
|
||||||
@ -879,11 +1221,30 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
|
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
conn = connect()
|
# Release the preview frame before opening the stream - lets the GC reclaim
|
||||||
|
# it while we're holding a Postgres transaction open.
|
||||||
|
del preview_df
|
||||||
|
|
||||||
|
def _filtered_chunks():
|
||||||
|
seen = 0
|
||||||
|
for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename):
|
||||||
|
chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude)
|
||||||
|
seen += len(chunk_df)
|
||||||
|
print(f" streaming... {seen:,} rows", file=sys.stderr)
|
||||||
|
yield chunk_df
|
||||||
|
|
||||||
|
db_user = db_password = None
|
||||||
|
if args.dbcreds:
|
||||||
|
db_user = input("Database username: ")
|
||||||
|
db_password = getpass.getpass("Database password: ")
|
||||||
|
|
||||||
|
conn = connect(user=db_user, password=db_password)
|
||||||
conn.autocommit = False
|
conn.autocommit = False
|
||||||
try:
|
try:
|
||||||
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
||||||
inserted = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns)
|
inserted = copy_dataframes(
|
||||||
|
conn, cfg.schemaname, cfg.tablename, _filtered_chunks(), columns
|
||||||
|
)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
except Exception:
|
except Exception:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
|||||||
54
generic_loader/sample_folder_config.yaml
Normal file
54
generic_loader/sample_folder_config.yaml
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
# Example folder-level loader config.
|
||||||
|
#
|
||||||
|
# Shape mirrors what `load_folder.py` expects:
|
||||||
|
#
|
||||||
|
# python load_folder.py --config sample_folder_config.yaml --dry-run
|
||||||
|
# python load_folder.py --config sample_folder_config.yaml
|
||||||
|
#
|
||||||
|
# Relative paths are resolved against this config file's directory first,
|
||||||
|
# falling back to the current working directory if that doesn't exist.
|
||||||
|
|
||||||
|
folder: samples/folder_test
|
||||||
|
schemaname: public
|
||||||
|
|
||||||
|
# Applied when creating the first file of each cluster.
|
||||||
|
# One of: fail | replace | append. Default: fail.
|
||||||
|
if_exists: replace
|
||||||
|
|
||||||
|
# When true (default), any file not matched by an explicit pattern below is
|
||||||
|
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
||||||
|
# _ / -) from the file stem. Files with no trailing digits become their own
|
||||||
|
# singleton cluster.
|
||||||
|
auto_detect: true
|
||||||
|
|
||||||
|
# Folder-level column filter. Every file in every cluster passes through
|
||||||
|
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
|
||||||
|
# override these via its own `include` / `exclude` keys.
|
||||||
|
#
|
||||||
|
# include:
|
||||||
|
# - ID
|
||||||
|
# - INTCOL
|
||||||
|
# exclude:
|
||||||
|
# - ALLNULL
|
||||||
|
|
||||||
|
# Explicit cluster patterns. Each pattern is matched against the file
|
||||||
|
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
||||||
|
# pool, so explicit and auto clusters compose cleanly.
|
||||||
|
#
|
||||||
|
# `tablename` is required. `if_exists`, `include`, and `exclude` are
|
||||||
|
# optional per-cluster overrides of the folder-level defaults above.
|
||||||
|
clusters:
|
||||||
|
- pattern: '^group_a\d+\.xpt$'
|
||||||
|
tablename: group_a
|
||||||
|
|
||||||
|
# Example of an explicit override. Uncomment to force the group_b cluster to
|
||||||
|
# append instead of replace even though the folder default is "replace":
|
||||||
|
#
|
||||||
|
# - pattern: '^group_b\d+\.xpt$'
|
||||||
|
# tablename: group_b
|
||||||
|
# if_exists: append
|
||||||
|
|
||||||
|
# With only the gq pattern explicit, auto_detect: true will still bucket
|
||||||
|
# group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
|
||||||
|
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
|
||||||
|
# for the fixture that exercises exactly this layout.
|
||||||
@ -1,6 +1,6 @@
|
|||||||
pandas>=2.0,<2.3
|
pandas>=2.0,<3.0
|
||||||
pyreadstat>=1.2,<1.3
|
pyreadstat>=1.2,<1.3
|
||||||
numpy>=1.24,<2.1
|
numpy>=2.1,<3.0
|
||||||
pyyaml>=6.0,<7.0
|
pyyaml>=6.0,<7.0
|
||||||
psycopg2-binary>=2.9,<3.0
|
psycopg2-binary>=2.9,<3.0
|
||||||
python-dotenv>=1.0,<2.0
|
python-dotenv>=1.0,<2.0
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user