diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 40489cc..437d6e1 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -1083,6 +1083,20 @@ def _build_argparser() -> argparse.ArgumentParser: "PGUSER / PGPASSWORD from the environment or .env file." ), ) + p.add_argument( + "--chunk-rows", + type=int, + default=None, + metavar="N", + help=( + "Per-chunk row target for pyreadstat streaming and COPY. " + "Overrides both the GENERIC_LOADER_CHUNK_ROWS env var and the " + "auto-scaling applied when --workers > 1. Peak memory per " + "worker is roughly 4 × N × avg_row_bytes; with wide sas7bdat " + "files (~4 KB/row) and 32 workers, N=100000 is a safe starting " + "point on a 128 GB box." + ), + ) p.add_argument( "--no-prescan", action="store_true", @@ -1234,19 +1248,40 @@ def main(argv: Optional[List[str]] = None) -> int: workers = max(1, int(args.workers)) - # When running parallel workers, bound peak memory: each worker buffers a - # chunk (read + prepared + serialized) so total memory scales with - # workers × chunk_rows × avg_row_bytes. Drop the default chunk target to - # 500k unless the operator has explicitly pinned it. Setting the env var - # before workers spawn means they inherit it through forkserver / spawn. - if ( - workers > 1 - and "GENERIC_LOADER_CHUNK_ROWS" not in os.environ - ): - os.environ["GENERIC_LOADER_CHUNK_ROWS"] = "500000" + # Per-worker peak memory ~= chunk_rows × avg_row_bytes × ~4 (the original + # pyreadstat DataFrame, the type-coerced ``prepared`` copy, the pyarrow + # table, and the serialized CSV buffer can all be alive simultaneously). + # With 32 workers and 500k rows × wide sas7bdat that's easily >128 GB - + # the default the loader shipped with OOM'd on a c6i.32xlarge box. Scale + # the auto target inversely with worker count so total memory stays + # roughly flat regardless of how many workers you pick. Floor of 50k + # keeps per-chunk overhead amortized; ceiling of 500k is where pyarrow + # / pyreadstat buffer spikes start to dominate. + # + # Order of precedence (most wins): + # 1. ``--chunk-rows N`` CLI flag (if provided) + # 2. ``GENERIC_LOADER_CHUNK_ROWS`` env var (if already set) + # 3. Auto-pick based on ``workers`` + if args.chunk_rows is not None: + os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(int(args.chunk_rows)) print( - "[info] parallel mode: bounding per-chunk rows to 500,000. " - "Pin GENERIC_LOADER_CHUNK_ROWS to override.", + f"[info] --chunk-rows {args.chunk_rows:,}: pinning per-chunk " + f"row target (overrides auto-scaling).", + file=sys.stderr, + ) + elif "GENERIC_LOADER_CHUNK_ROWS" in os.environ: + print( + f"[info] honoring GENERIC_LOADER_CHUNK_ROWS=" + f"{os.environ['GENERIC_LOADER_CHUNK_ROWS']} from environment.", + file=sys.stderr, + ) + elif workers > 1: + auto_rows = max(50_000, min(500_000, 3_200_000 // workers)) + os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(auto_rows) + print( + f"[info] parallel mode (workers={workers}): auto-scaled " + f"per-chunk rows to {auto_rows:,}. " + f"Use --chunk-rows N to override if you have RAM headroom.", file=sys.stderr, ) diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 2818bbc..d075b0d 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -1881,15 +1881,34 @@ def copy_dataframes( ) total = 0 + # Pull chunks one at a time so each ``df`` is unreferenced before the + # generator reads the next one. Without this the loop-variable binding + # of a ``for df in dfs:`` keeps the previous chunk alive during the + # next pyreadstat read, pushing peak memory to 5-6× chunk size per + # worker (old df + incoming df + prepared + pyarrow table + CSV buf). + # With explicit drops we cap peak at ~2× chunk size: ``df`` goes away + # once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf`` + # once COPY has consumed it. Matters most in parallel mode where + # 32 × per-worker peak can exhaust a 128 GB host. + dfs_iter = iter(dfs) with conn.cursor() as cur: - for df in dfs: + while True: + try: + df = next(dfs_iter) + except StopIteration: + break if df.empty: + del df continue prepared = _prepare_for_copy(df, columns) + del df + n = len(prepared) buf = _serialize_chunk_csv(prepared) + del prepared cur.copy_expert(sql, buf) + del buf conn.commit() - total += len(prepared) + total += n return total