"""Generate a folder of clustered SAS XPORT files for testing ``load_folder``. Produces ``samples/folder_test/`` containing three clusters: * ``group_a{1,2,3}.xpt`` - kitchen-sink schema (every column). * ``group_b{1,2}.xpt`` - a *different* schema (drops ``BIGINT`` and ``TIMECOL``) so a schema-compat check would catch cross-cluster contamination if the regex were wrong. * ``standalone.xpt`` - singleton to exercise the no-cluster / singleton auto-detect path. Alongside the files, writes ``sample_folder_config.yaml`` that exercises both code paths: ``group_a*`` via an explicit regex pattern, ``group_b*`` and ``standalone`` via auto-detect. Finally, runs :func:`load_folder.discover_clusters` against the generated folder and asserts the grouping is what we expect. This is a pure in-process smoke test of the clustering logic; no Postgres connection is required. Reuses ``generate_sample_sas.build_dataframe`` so data shape / dtypes match the single-file loader tests. ``N_ROWS`` is temporarily shrunk on the imported module for this run's duration so repeated invocations stay fast. """ from __future__ import annotations import sys from pathlib import Path import numpy as np import pandas as pd import pyreadstat import yaml import generate_sample_sas as gss from load_folder import discover_clusters, load_folder_config FIXTURE_ROWS = 2_000 OUT_DIR = Path("samples/folder_test") CONFIG_PATH = OUT_DIR / "folder_config.yaml" GROUP_A_FILES = ["group_a1.xpt", "group_a2.xpt", "group_a3.xpt"] GROUP_B_FILES = ["group_b1.xpt", "group_b2.xpt"] STANDALONE_FILE = "standalone.xpt" # Columns dropped from the group_b cluster so it has a genuinely different # schema from the group_a cluster. If the regex accidentally pulled a group_b file # into the group_a cluster (or vice versa), load_cluster's schema-compat check # would fire on these differences. GROUP_B_DROPPED_COLUMNS = ("BIGINT", "TIMECOL") def _build_df(seed: int) -> pd.DataFrame: """Build a kitchen-sink DataFrame via the existing generator. Temporarily shrinks ``generate_sample_sas.N_ROWS`` so each fixture file is small enough to regenerate quickly. Restored afterward so importing this module alongside the main generator stays side-effect free. """ saved = gss.N_ROWS gss.N_ROWS = FIXTURE_ROWS try: rng = np.random.default_rng(seed) return gss.build_dataframe(rng) finally: gss.N_ROWS = saved def _write_xport(df: pd.DataFrame, path: Path, table_name: str) -> None: # Only pass variable_format entries for columns that actually exist in # this frame - write_xport errors on formats referencing missing cols. variable_format = { k: v for k, v in gss.VARIABLE_FORMATS.items() if k in df.columns } column_labels = {k: v for k, v in gss.COLUMN_LABELS.items() if k in df.columns} pyreadstat.write_xport( df, str(path), file_format_version=5, table_name=table_name, file_label=f"Folder-loader fixture ({path.name})", column_labels=column_labels, variable_format=variable_format, ) def generate_fixtures() -> None: OUT_DIR.mkdir(parents=True, exist_ok=True) for i, name in enumerate(GROUP_A_FILES): df = _build_df(seed=100 + i) _write_xport(df, OUT_DIR / name, table_name=f"GRPA{i + 1}") print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)") for i, name in enumerate(GROUP_B_FILES): df = _build_df(seed=200 + i) df = df.drop(columns=list(GROUP_B_DROPPED_COLUMNS)) _write_xport(df, OUT_DIR / name, table_name=f"GRPB{i + 1}") print(f" wrote {OUT_DIR / name} ({len(df):,} rows, {len(df.columns)} cols)") df = _build_df(seed=300) _write_xport(df, OUT_DIR / STANDALONE_FILE, table_name="STDALONE") print( f" wrote {OUT_DIR / STANDALONE_FILE} " f"({len(df):,} rows, {len(df.columns)} cols)" ) def write_config() -> None: cfg = { "folder": ".", # config lives inside the target folder "schemaname": "public", "if_exists": "replace", "auto_detect": True, "clusters": [ { "pattern": r"^group_a\d+\.xpt$", "tablename": "group_a", }, ], } with CONFIG_PATH.open("w", encoding="utf-8") as f: # Top-of-file comment documents the intent of this generated config. f.write( "# Generated by generate_sample_folder.py. Demonstrates both\n" "# explicit regex clustering (group_a*) and auto-detect\n" "# (group_b* and standalone) working together.\n" ) yaml.safe_dump(cfg, f, sort_keys=False) print(f" wrote {CONFIG_PATH}") def verify() -> None: """Smoke-test the clustering logic against the generated folder.""" cfg = load_folder_config(CONFIG_PATH) clusters = discover_clusters(cfg) by_name = {c.tablename: c for c in clusters} expected_names = {"group_a", "group_b", "standalone"} actual_names = set(by_name) assert expected_names == actual_names, ( f"cluster set mismatch: expected {expected_names}, got {actual_names}" ) group_a = by_name["group_a"] assert group_a.source == "explicit", f"group_a source = {group_a.source!r}" assert [f.name for f in group_a.files] == sorted(GROUP_A_FILES), ( f"group_a files = {[f.name for f in group_a.files]}" ) group_b = by_name["group_b"] assert group_b.source == "auto", f"group_b source = {group_b.source!r}" assert [f.name for f in group_b.files] == sorted(GROUP_B_FILES), ( f"group_b files = {[f.name for f in group_b.files]}" ) standalone = by_name["standalone"] assert standalone.source == "auto", f"standalone source = {standalone.source!r}" assert [f.name for f in standalone.files] == [STANDALONE_FILE], ( f"standalone files = {[f.name for f in standalone.files]}" ) print(" clustering verified:") for c in clusters: files = ", ".join(f.name for f in c.files) print(f" {c.tablename} [{c.source}]: {files}") def main() -> int: print(f"Writing fixture SAS files to {OUT_DIR}/") generate_fixtures() print(f"\nWriting folder config to {CONFIG_PATH}") write_config() print("\nVerifying discover_clusters() grouping...") verify() print("\nOK. Try:") print(f" python load_folder.py --config {CONFIG_PATH} --dry-run") return 0 if __name__ == "__main__": sys.exit(main())