186 lines
6.4 KiB
Python
186 lines
6.4 KiB
Python
|
|
"""Generate a folder of clustered SAS XPORT files for testing ``load_folder``.
|
||
|
|
|
||
|
|
Produces ``samples/folder_test/`` containing three clusters:
|
||
|
|
|
||
|
|
* ``group_a{1,2,3}.xpt`` - kitchen-sink schema (every column).
|
||
|
|
* ``group_b{1,2}.xpt`` - a *different* schema (drops ``BIGINT`` and
|
||
|
|
``TIMECOL``) so a schema-compat check would catch cross-cluster
|
||
|
|
contamination if the regex were wrong.
|
||
|
|
* ``standalone.xpt`` - singleton to exercise the no-cluster / singleton
|
||
|
|
auto-detect path.
|
||
|
|
|
||
|
|
Alongside the files, writes ``sample_folder_config.yaml`` that exercises
|
||
|
|
both code paths: ``group_a*`` via an explicit regex pattern, ``group_b*``
|
||
|
|
and ``standalone`` via auto-detect.
|
||
|
|
|
||
|
|
Finally, runs :func:`load_folder.discover_clusters` against the generated
|
||
|
|
folder and asserts the grouping is what we expect. This is a pure in-process
|
||
|
|
smoke test of the clustering logic; no Postgres connection is required.
|
||
|
|
|
||
|
|
Reuses ``generate_sample_sas.build_dataframe`` so data shape / dtypes match
|
||
|
|
the single-file loader tests. ``N_ROWS`` is temporarily shrunk on the
|
||
|
|
imported module for this run's duration so repeated invocations stay fast.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import pandas as pd
|
||
|
|
import pyreadstat
|
||
|
|
import yaml
|
||
|
|
|
||
|
|
import generate_sample_sas as gss
|
||
|
|
from load_folder import discover_clusters, load_folder_config
|
||
|
|
|
||
|
|
|
||
|
|
FIXTURE_ROWS = 2_000
|
||
|
|
OUT_DIR = Path("samples/folder_test")
|
||
|
|
CONFIG_PATH = OUT_DIR / "folder_config.yaml"
|
||
|
|
|
||
|
|
GROUP_A_FILES = ["group_a1.xpt", "group_a2.xpt", "group_a3.xpt"]
|
||
|
|
GROUP_B_FILES = ["group_b1.xpt", "group_b2.xpt"]
|
||
|
|
STANDALONE_FILE = "standalone.xpt"
|
||
|
|
|
||
|
|
# Columns dropped from the group_b cluster so it has a genuinely different
|
||
|
|
# schema from the group_a cluster. If the regex accidentally pulled a group_b file
|
||
|
|
# into the group_a cluster (or vice versa), load_cluster's schema-compat check
|
||
|
|
# would fire on these differences.
|
||
|
|
GROUP_B_DROPPED_COLUMNS = ("BIGINT", "TIMECOL")
|
||
|
|
|
||
|
|
|
||
|
|
def _build_df(seed: int) -> pd.DataFrame:
|
||
|
|
"""Build a kitchen-sink DataFrame via the existing generator.
|
||
|
|
|
||
|
|
Temporarily shrinks ``generate_sample_sas.N_ROWS`` so each fixture file
|
||
|
|
is small enough to regenerate quickly. Restored afterward so importing
|
||
|
|
this module alongside the main generator stays side-effect free.
|
||
|
|
"""
|
||
|
|
saved = gss.N_ROWS
|
||
|
|
gss.N_ROWS = FIXTURE_ROWS
|
||
|
|
try:
|
||
|
|
rng = np.random.default_rng(seed)
|
||
|
|
return gss.build_dataframe(rng)
|
||
|
|
finally:
|
||
|
|
gss.N_ROWS = saved
|
||
|
|
|
||
|
|
|
||
|
|
def _write_xport(df: pd.DataFrame, path: Path, table_name: str) -> None:
|
||
|
|
# Only pass variable_format entries for columns that actually exist in
|
||
|
|
# this frame - write_xport errors on formats referencing missing cols.
|
||
|
|
variable_format = {
|
||
|
|
k: v for k, v in gss.VARIABLE_FORMATS.items() if k in df.columns
|
||
|
|
}
|
||
|
|
column_labels = {k: v for k, v in gss.COLUMN_LABELS.items() if k in df.columns}
|
||
|
|
|
||
|
|
pyreadstat.write_xport(
|
||
|
|
df,
|
||
|
|
str(path),
|
||
|
|
file_format_version=5,
|
||
|
|
table_name=table_name,
|
||
|
|
file_label=f"Folder-loader fixture ({path.name})",
|
||
|
|
column_labels=column_labels,
|
||
|
|
variable_format=variable_format,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def generate_fixtures() -> None:
|
||
|
|
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
for i, name in enumerate(GROUP_A_FILES):
|
||
|
|
df = _build_df(seed=100 + i)
|
||
|
|
_write_xport(df, OUT_DIR / name, table_name=f"GRPA{i + 1}")
|
||
|
|
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
|
||
|
|
|
||
|
|
for i, name in enumerate(GROUP_B_FILES):
|
||
|
|
df = _build_df(seed=200 + i)
|
||
|
|
df = df.drop(columns=list(GROUP_B_DROPPED_COLUMNS))
|
||
|
|
_write_xport(df, OUT_DIR / name, table_name=f"GRPB{i + 1}")
|
||
|
|
print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)")
|
||
|
|
|
||
|
|
df = _build_df(seed=300)
|
||
|
|
_write_xport(df, OUT_DIR / STANDALONE_FILE, table_name="STDALONE")
|
||
|
|
print(
|
||
|
|
f" wrote {OUT_DIR / STANDALONE_FILE} "
|
||
|
|
f"({len(df):,} rows, {len(df.columns)} cols)"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def write_config() -> None:
|
||
|
|
cfg = {
|
||
|
|
"folder": ".", # config lives inside the target folder
|
||
|
|
"schemaname": "public",
|
||
|
|
"if_exists": "replace",
|
||
|
|
"auto_detect": True,
|
||
|
|
"clusters": [
|
||
|
|
{
|
||
|
|
"pattern": r"^group_a\d+\.xpt$",
|
||
|
|
"tablename": "group_a",
|
||
|
|
},
|
||
|
|
],
|
||
|
|
}
|
||
|
|
with CONFIG_PATH.open("w", encoding="utf-8") as f:
|
||
|
|
# Top-of-file comment documents the intent of this generated config.
|
||
|
|
f.write(
|
||
|
|
"# Generated by generate_sample_folder.py. Demonstrates both\n"
|
||
|
|
"# explicit regex clustering (group_a*) and auto-detect\n"
|
||
|
|
"# (group_b* and standalone) working together.\n"
|
||
|
|
)
|
||
|
|
yaml.safe_dump(cfg, f, sort_keys=False)
|
||
|
|
print(f" wrote {CONFIG_PATH}")
|
||
|
|
|
||
|
|
|
||
|
|
def verify() -> None:
|
||
|
|
"""Smoke-test the clustering logic against the generated folder."""
|
||
|
|
cfg = load_folder_config(CONFIG_PATH)
|
||
|
|
clusters = discover_clusters(cfg)
|
||
|
|
|
||
|
|
by_name = {c.tablename: c for c in clusters}
|
||
|
|
|
||
|
|
expected_names = {"group_a", "group_b", "standalone"}
|
||
|
|
actual_names = set(by_name)
|
||
|
|
assert expected_names == actual_names, (
|
||
|
|
f"cluster set mismatch: expected {expected_names}, got {actual_names}"
|
||
|
|
)
|
||
|
|
|
||
|
|
group_a = by_name["group_a"]
|
||
|
|
assert group_a.source == "explicit", f"group_a source = {group_a.source!r}"
|
||
|
|
assert [f.name for f in group_a.files] == sorted(GROUP_A_FILES), (
|
||
|
|
f"group_a files = {[f.name for f in group_a.files]}"
|
||
|
|
)
|
||
|
|
|
||
|
|
group_b = by_name["group_b"]
|
||
|
|
assert group_b.source == "auto", f"group_b source = {group_b.source!r}"
|
||
|
|
assert [f.name for f in group_b.files] == sorted(GROUP_B_FILES), (
|
||
|
|
f"group_b files = {[f.name for f in group_b.files]}"
|
||
|
|
)
|
||
|
|
|
||
|
|
standalone = by_name["standalone"]
|
||
|
|
assert standalone.source == "auto", f"standalone source = {standalone.source!r}"
|
||
|
|
assert [f.name for f in standalone.files] == [STANDALONE_FILE], (
|
||
|
|
f"standalone files = {[f.name for f in standalone.files]}"
|
||
|
|
)
|
||
|
|
|
||
|
|
print(" clustering verified:")
|
||
|
|
for c in clusters:
|
||
|
|
files = ", ".join(f.name for f in c.files)
|
||
|
|
print(f" {c.tablename} [{c.source}]: {files}")
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
print(f"Writing fixture SAS files to {OUT_DIR}/")
|
||
|
|
generate_fixtures()
|
||
|
|
print(f"\nWriting folder config to {CONFIG_PATH}")
|
||
|
|
write_config()
|
||
|
|
print("\nVerifying discover_clusters() grouping...")
|
||
|
|
verify()
|
||
|
|
print("\nOK. Try:")
|
||
|
|
print(f" python load_folder.py --config {CONFIG_PATH} --dry-run")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|