advanced_analyzer #8
@ -1083,6 +1083,20 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
"PGUSER / PGPASSWORD from the environment or .env file."
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--chunk-rows",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
metavar="N",
|
||||||
|
help=(
|
||||||
|
"Per-chunk row target for pyreadstat streaming and COPY. "
|
||||||
|
"Overrides both the GENERIC_LOADER_CHUNK_ROWS env var and the "
|
||||||
|
"auto-scaling applied when --workers > 1. Peak memory per "
|
||||||
|
"worker is roughly 4 × N × avg_row_bytes; with wide sas7bdat "
|
||||||
|
"files (~4 KB/row) and 32 workers, N=100000 is a safe starting "
|
||||||
|
"point on a 128 GB box."
|
||||||
|
),
|
||||||
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--no-prescan",
|
"--no-prescan",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
@ -1234,19 +1248,40 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
|
|
||||||
workers = max(1, int(args.workers))
|
workers = max(1, int(args.workers))
|
||||||
|
|
||||||
# When running parallel workers, bound peak memory: each worker buffers a
|
# Per-worker peak memory ~= chunk_rows × avg_row_bytes × ~4 (the original
|
||||||
# chunk (read + prepared + serialized) so total memory scales with
|
# pyreadstat DataFrame, the type-coerced ``prepared`` copy, the pyarrow
|
||||||
# workers × chunk_rows × avg_row_bytes. Drop the default chunk target to
|
# table, and the serialized CSV buffer can all be alive simultaneously).
|
||||||
# 500k unless the operator has explicitly pinned it. Setting the env var
|
# With 32 workers and 500k rows × wide sas7bdat that's easily >128 GB -
|
||||||
# before workers spawn means they inherit it through forkserver / spawn.
|
# the default the loader shipped with OOM'd on a c6i.32xlarge box. Scale
|
||||||
if (
|
# the auto target inversely with worker count so total memory stays
|
||||||
workers > 1
|
# roughly flat regardless of how many workers you pick. Floor of 50k
|
||||||
and "GENERIC_LOADER_CHUNK_ROWS" not in os.environ
|
# keeps per-chunk overhead amortized; ceiling of 500k is where pyarrow
|
||||||
):
|
# / pyreadstat buffer spikes start to dominate.
|
||||||
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = "500000"
|
#
|
||||||
|
# Order of precedence (most wins):
|
||||||
|
# 1. ``--chunk-rows N`` CLI flag (if provided)
|
||||||
|
# 2. ``GENERIC_LOADER_CHUNK_ROWS`` env var (if already set)
|
||||||
|
# 3. Auto-pick based on ``workers``
|
||||||
|
if args.chunk_rows is not None:
|
||||||
|
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(int(args.chunk_rows))
|
||||||
print(
|
print(
|
||||||
"[info] parallel mode: bounding per-chunk rows to 500,000. "
|
f"[info] --chunk-rows {args.chunk_rows:,}: pinning per-chunk "
|
||||||
"Pin GENERIC_LOADER_CHUNK_ROWS to override.",
|
f"row target (overrides auto-scaling).",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
elif "GENERIC_LOADER_CHUNK_ROWS" in os.environ:
|
||||||
|
print(
|
||||||
|
f"[info] honoring GENERIC_LOADER_CHUNK_ROWS="
|
||||||
|
f"{os.environ['GENERIC_LOADER_CHUNK_ROWS']} from environment.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
elif workers > 1:
|
||||||
|
auto_rows = max(50_000, min(500_000, 3_200_000 // workers))
|
||||||
|
os.environ["GENERIC_LOADER_CHUNK_ROWS"] = str(auto_rows)
|
||||||
|
print(
|
||||||
|
f"[info] parallel mode (workers={workers}): auto-scaled "
|
||||||
|
f"per-chunk rows to {auto_rows:,}. "
|
||||||
|
f"Use --chunk-rows N to override if you have RAM headroom.",
|
||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -1881,15 +1881,34 @@ def copy_dataframes(
|
|||||||
)
|
)
|
||||||
|
|
||||||
total = 0
|
total = 0
|
||||||
|
# Pull chunks one at a time so each ``df`` is unreferenced before the
|
||||||
|
# generator reads the next one. Without this the loop-variable binding
|
||||||
|
# of a ``for df in dfs:`` keeps the previous chunk alive during the
|
||||||
|
# next pyreadstat read, pushing peak memory to 5-6× chunk size per
|
||||||
|
# worker (old df + incoming df + prepared + pyarrow table + CSV buf).
|
||||||
|
# With explicit drops we cap peak at ~2× chunk size: ``df`` goes away
|
||||||
|
# once ``prepared`` exists, ``prepared`` once ``buf`` exists, ``buf``
|
||||||
|
# once COPY has consumed it. Matters most in parallel mode where
|
||||||
|
# 32 × per-worker peak can exhaust a 128 GB host.
|
||||||
|
dfs_iter = iter(dfs)
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
for df in dfs:
|
while True:
|
||||||
|
try:
|
||||||
|
df = next(dfs_iter)
|
||||||
|
except StopIteration:
|
||||||
|
break
|
||||||
if df.empty:
|
if df.empty:
|
||||||
|
del df
|
||||||
continue
|
continue
|
||||||
prepared = _prepare_for_copy(df, columns)
|
prepared = _prepare_for_copy(df, columns)
|
||||||
|
del df
|
||||||
|
n = len(prepared)
|
||||||
buf = _serialize_chunk_csv(prepared)
|
buf = _serialize_chunk_csv(prepared)
|
||||||
|
del prepared
|
||||||
cur.copy_expert(sql, buf)
|
cur.copy_expert(sql, buf)
|
||||||
|
del buf
|
||||||
conn.commit()
|
conn.commit()
|
||||||
total += len(prepared)
|
total += n
|
||||||
return total
|
return total
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user