foxtrot/generic_loader/generate_sample_sas.py

381 lines
14 KiB
Python
Raw Normal View History

2026-04-18 14:34:48 +00:00
"""Generate a kitchen-sink SAS XPORT file plus an expected-types manifest.
Running this script produces two files under samples/:
- sample_kitchensink.xpt the SAS XPORT test fixture
- sample_kitchensink.expected.json ground-truth Postgres types for the loader
Tune behavior via the top-level constants below.
"""
from __future__ import annotations
import datetime as dt
import json
import string
from pathlib import Path
import numpy as np
import pandas as pd
import pyreadstat
SEED = 42
N_ROWS = 1000
NULL_FRACTION = 0.20
OUT_DIR = Path("samples")
OUT_PATH = OUT_DIR / "sample_kitchensink.xpt"
MANIFEST_PATH = OUT_DIR / "sample_kitchensink.expected.json"
POSITIVE_CONTROLS = {"ID", "INTCOL", "STRCOL", "DATECOL", "CONST"}
ALL_NULL_COLS = {"ALLNULL", "ALLNULLC"}
def _missing_mask(rng: np.random.Generator, n: int, frac: float) -> np.ndarray:
"""Return a boolean array of length n with exactly round(frac * n) True positions.
Using an exact count (rather than per-row Bernoulli draws) keeps the observed
missing fraction tight so the round-trip assertion can use a small tolerance.
"""
mask = np.zeros(n, dtype=bool)
k = int(round(frac * n))
if k > 0:
idx = rng.choice(n, size=k, replace=False)
mask[idx] = True
return mask
def _random_word(rng: np.random.Generator, min_len: int = 3, max_len: int = 10) -> str:
length = int(rng.integers(min_len, max_len + 1))
letters = np.array(list(string.ascii_lowercase))
return "".join(rng.choice(letters, size=length))
def _random_sentence(rng: np.random.Generator, min_words: int = 8, max_words: int = 20) -> str:
n_words = int(rng.integers(min_words, max_words + 1))
return " ".join(_random_word(rng) for _ in range(n_words))
def build_dataframe(rng: np.random.Generator) -> pd.DataFrame:
n = N_ROWS
ids = np.arange(1, n + 1, dtype=np.int64)
int_vals = rng.integers(0, 1000, size=n).astype(np.float64)
bigint_vals = rng.integers(10_000_000_000, 20_000_000_000, size=n).astype(np.float64)
bigint_vals[_missing_mask(rng, n, NULL_FRACTION)] = np.nan
float_vals = rng.normal(loc=100.0, scale=15.0, size=n)
float_vals[_missing_mask(rng, n, NULL_FRACTION)] = np.nan
bool_vals = rng.integers(0, 2, size=n).astype(np.float64)
bool_vals[_missing_mask(rng, n, NULL_FRACTION)] = np.nan
str_vals = [_random_word(rng, 3, 8) for _ in range(n)]
long_str_vals: list[str] = []
long_mask = _missing_mask(rng, n, NULL_FRACTION)
for i in range(n):
long_str_vals.append("" if long_mask[i] else _random_sentence(rng))
base_date = dt.date(2020, 1, 1)
date_vals = [base_date + dt.timedelta(days=int(rng.integers(0, 2000))) for _ in range(n)]
dt_vals_mask = _missing_mask(rng, n, NULL_FRACTION)
dt_vals: list = []
base_dt = dt.datetime(2020, 1, 1, 0, 0, 0)
for i in range(n):
if dt_vals_mask[i]:
dt_vals.append(pd.NaT)
else:
offset_seconds = int(rng.integers(0, 2000 * 24 * 3600))
dt_vals.append(base_dt + dt.timedelta(seconds=offset_seconds))
dt_series = pd.to_datetime(dt_vals)
time_mask = _missing_mask(rng, n, NULL_FRACTION)
time_vals: list = []
for i in range(n):
if time_mask[i]:
time_vals.append(None)
else:
seconds_into_day = int(rng.integers(0, 24 * 3600))
h, rem = divmod(seconds_into_day, 3600)
m, s = divmod(rem, 60)
time_vals.append(dt.time(h, m, s))
numasstr_mask = _missing_mask(rng, n, NULL_FRACTION)
numasstr_vals: list[str] = []
for i in range(n):
if numasstr_mask[i]:
numasstr_vals.append("")
elif rng.random() < 0.5:
numasstr_vals.append(str(int(rng.integers(-500, 500))))
else:
numasstr_vals.append(f"{rng.normal(0, 50):.2f}")
dateasstr_mask = _missing_mask(rng, n, NULL_FRACTION)
dateasstr_vals: list[str] = []
for i in range(n):
if dateasstr_mask[i]:
dateasstr_vals.append("")
else:
d = base_date + dt.timedelta(days=int(rng.integers(0, 2000)))
dateasstr_vals.append(d.isoformat())
mixed_mask = _missing_mask(rng, n, NULL_FRACTION)
mixed_vals: list[str] = []
choices = ["number", "date", "text", "text"]
for i in range(n):
if mixed_mask[i]:
mixed_vals.append("")
continue
kind = choices[int(rng.integers(0, len(choices)))]
if kind == "number":
mixed_vals.append(str(int(rng.integers(0, 1000))))
elif kind == "date":
d = base_date + dt.timedelta(days=int(rng.integers(0, 2000)))
mixed_vals.append(d.isoformat())
else:
mixed_vals.append(_random_word(rng, 4, 12))
const_vals = ["CONSTANT"] * n
allnull_vals = np.full(n, np.nan, dtype=np.float64)
allnullc_vals = [""] * n
df = pd.DataFrame(
{
"ID": ids,
"INTCOL": int_vals,
"BIGINT": bigint_vals,
"FLOATCOL": float_vals,
"BOOLCOL": bool_vals,
"STRCOL": str_vals,
"LONGSTR": long_str_vals,
"DATECOL": date_vals,
"DTCOL": dt_series,
"TIMECOL": time_vals,
"NUMASSTR": numasstr_vals,
"DATEASTR": dateasstr_vals,
"MIXED": mixed_vals,
"CONST": const_vals,
"ALLNULL": allnull_vals,
"ALLNULLC": allnullc_vals,
}
)
return df
COLUMN_LABELS: dict[str, str] = {
"ID": "Row identifier",
"INTCOL": "Integer positive control",
"BIGINT": "Big integer beyond int32 range",
"FLOATCOL": "Floating point with decimals",
"BOOLCOL": "Nullable boolean 0/1/NaN",
"STRCOL": "Short string positive control",
"LONGSTR": "Longer free-text string",
"DATECOL": "Date positive control",
"DTCOL": "Datetime with missing values",
"TIMECOL": "Time of day with missing values",
"NUMASSTR": "Numeric-looking strings in a char column",
"DATEASTR": "Date-looking strings in a char column",
"MIXED": "Heterogeneous strings: fallback to text",
"CONST": "Constant repeated value",
"ALLNULL": "Entirely missing numeric column",
"ALLNULLC": "Entirely missing character column",
}
VARIABLE_FORMATS: dict[str, str] = {
"DATECOL": "DATE9.",
"DTCOL": "DATETIME20.",
"TIMECOL": "TIME8.",
}
EXPECTED_MANIFEST: dict[str, dict] = {
"ID": {"postgres_type": "INTEGER", "nullable": False},
"INTCOL": {"postgres_type": "INTEGER", "nullable": False, "note": "positive control"},
"BIGINT": {"postgres_type": "BIGINT", "nullable": True, "note": "values beyond int32 range"},
"FLOATCOL": {"acceptable_types": ["DOUBLE PRECISION", "NUMERIC"], "nullable": True},
"BOOLCOL": {
"acceptable_types": ["BOOLEAN", "SMALLINT", "INTEGER"],
"nullable": True,
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
},
"STRCOL": {"acceptable_types": ["TEXT", "VARCHAR"], "nullable": False, "note": "positive control"},
"LONGSTR": {"acceptable_types": ["TEXT", "VARCHAR"], "nullable": True},
"DATECOL": {"postgres_type": "DATE", "nullable": False, "note": "positive control"},
"DTCOL": {"acceptable_types": ["TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE"], "nullable": True},
"TIMECOL": {"postgres_type": "TIME", "nullable": True},
"NUMASSTR": {
"acceptable_types": ["NUMERIC", "DOUBLE PRECISION"],
"nullable": True,
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
},
"DATEASTR": {
"postgres_type": "DATE",
"nullable": True,
"note": "stored as char in SAS; loader should coerce ISO-date strings",
},
"MIXED": {
"acceptable_types": ["TEXT", "VARCHAR"],
"nullable": True,
"note": "heterogeneous content; loader should fall back to text",
},
"CONST": {"acceptable_types": ["TEXT", "VARCHAR"], "nullable": False},
"ALLNULL": {
"acceptable_types": ["TEXT", "VARCHAR"],
"nullable": True,
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
},
"ALLNULLC": {
"acceptable_types": ["TEXT", "VARCHAR"],
"nullable": True,
"note": "entirely null character",
},
}
def write_manifest(df: pd.DataFrame) -> None:
manifest_cols = set(EXPECTED_MANIFEST.keys())
df_cols = set(df.columns)
missing = df_cols - manifest_cols
extra = manifest_cols - df_cols
if missing or extra:
raise AssertionError(
f"Manifest/DataFrame column mismatch. Missing from manifest: {missing}. "
f"Extra in manifest: {extra}."
)
with MANIFEST_PATH.open("w", encoding="utf-8") as f:
json.dump(EXPECTED_MANIFEST, f, indent=2, sort_keys=True)
f.write("\n")
def _char_missing_fraction(series: pd.Series) -> float:
return float((series.fillna("").astype(str) == "").mean())
def _numeric_missing_fraction(series: pd.Series) -> float:
return float(series.isna().mean())
def verify_roundtrip(source_df: pd.DataFrame) -> pd.DataFrame:
# Use pyreadstat (the writer) to verify the writer's output. pyreadstat preserves
# SAS format metadata on readback, so we can confirm the date/datetime/time
# variable_format mappings actually took effect.
readback, _meta = pyreadstat.read_xport(str(OUT_PATH))
assert len(readback.columns) == len(source_df.columns), (
f"Column count mismatch: wrote {len(source_df.columns)}, read back {len(readback.columns)}"
)
assert set(readback.columns) == set(source_df.columns), (
f"Column name mismatch. Only in source: {set(source_df.columns) - set(readback.columns)}. "
f"Only in readback: {set(readback.columns) - set(source_df.columns)}."
)
assert len(readback) == len(source_df), (
f"Row count mismatch: wrote {len(source_df)}, read back {len(readback)}"
)
for col in ("DATECOL", "DTCOL"):
dtype = readback[col].dtype
is_datetime = pd.api.types.is_datetime64_any_dtype(dtype)
is_object_of_dates = pd.api.types.is_object_dtype(dtype) and readback[col].dropna().map(
lambda v: isinstance(v, (dt.date, dt.datetime, pd.Timestamp))
).all()
assert is_datetime or is_object_of_dates, (
f"{col} came back as {dtype}; expected datetime-like. "
f"variable_format mapping may not have taken effect."
)
time_dtype = readback["TIMECOL"].dtype
time_ok = (
pd.api.types.is_datetime64_any_dtype(time_dtype)
or pd.api.types.is_numeric_dtype(time_dtype)
or (
pd.api.types.is_object_dtype(time_dtype)
and readback["TIMECOL"].dropna().map(
lambda v: isinstance(v, (dt.time, dt.datetime, pd.Timestamp, int, float))
).all()
)
)
assert time_ok, f"TIMECOL came back as {time_dtype}; expected datetime/numeric/time-object"
tol = 0.10
for col in source_df.columns:
if col in POSITIVE_CONTROLS:
series = readback[col]
if pd.api.types.is_numeric_dtype(series) or pd.api.types.is_datetime64_any_dtype(series):
observed = _numeric_missing_fraction(series)
else:
observed = _char_missing_fraction(series)
assert observed == 0.0, (
f"Positive control {col!r} has {observed:.2%} missing; expected 0%."
)
continue
if col in ALL_NULL_COLS:
series = readback[col]
if pd.api.types.is_numeric_dtype(series):
observed = _numeric_missing_fraction(series)
else:
observed = _char_missing_fraction(series)
assert observed == 1.0, (
f"All-null column {col!r} has {observed:.2%} missing; expected 100%."
)
continue
series = readback[col]
if pd.api.types.is_numeric_dtype(series) or pd.api.types.is_datetime64_any_dtype(series):
observed = _numeric_missing_fraction(series)
else:
observed = _char_missing_fraction(series)
assert abs(observed - NULL_FRACTION) <= tol, (
f"Column {col!r}: observed missing fraction {observed:.2%} not within "
f"±{tol:.0%} of NULL_FRACTION={NULL_FRACTION:.2%}."
)
assert MANIFEST_PATH.exists(), f"Manifest file {MANIFEST_PATH} missing."
with MANIFEST_PATH.open("r", encoding="utf-8") as f:
manifest = json.load(f)
assert set(manifest.keys()) == set(readback.columns), (
f"Manifest/readback column set mismatch. "
f"Only in manifest: {set(manifest.keys()) - set(readback.columns)}. "
f"Only in readback: {set(readback.columns) - set(manifest.keys())}."
)
return readback
def main() -> None:
OUT_DIR.mkdir(parents=True, exist_ok=True)
rng = np.random.default_rng(SEED)
df = build_dataframe(rng)
pyreadstat.write_xport(
df,
str(OUT_PATH),
file_format_version=5,
table_name="SAMPLE",
file_label="Kitchen sink sample for loader testing",
column_labels=COLUMN_LABELS,
variable_format=VARIABLE_FORMATS,
)
write_manifest(df)
readback = verify_roundtrip(df)
print(f"Wrote {OUT_PATH} ({N_ROWS} rows x {len(df.columns)} cols)")
print(f"Wrote {MANIFEST_PATH}")
print()
print("Readback via pyreadstat.read_xport (same reader the loader will use):")
print(readback.dtypes.to_string())
print()
print("Readback head:")
print(readback.head().to_string())
if __name__ == "__main__":
main()