advanced_analyzer #8

Merged
dp merged 23 commits from advanced_analyzer into main 2026-04-21 22:32:18 +00:00
Showing only changes of commit eac75cbb26 - Show all commits

View File

@ -728,15 +728,16 @@ def load_cluster(
files in the cluster - stay committed; only the in-flight chunk is
rolled back by :func:`main`.
``workers`` controls parallelism for the *append* phase. The first file
always runs serially on ``conn`` (to create the table and, when
partitioned, pre-create partitions). When ``workers > 1`` the remaining
files dispatch to a ``ProcessPoolExecutor``; each worker opens its own
psycopg2 connection, re-infers the per-file schema, runs the same
:func:`load_sas.assert_schema_compatible` check the serial path uses,
and streams chunks via COPY. Workers report per-chunk row counts to
``progress_queue`` so the caller can drive a single aggregated tqdm
bar regardless of how many workers are in flight.
``workers`` controls parallelism for streaming. With ``workers == 1``
every file streams on ``conn`` in sequence. With ``workers > 1`` the
main connection only does ``CREATE TABLE`` (and, for partitioned
clusters, partition discovery + pre-creation), commits, then dispatches
*every* file - including the first - to a ``ProcessPoolExecutor``. Each
worker opens its own psycopg2 connection, re-infers the per-file schema,
runs the same :func:`load_sas.assert_schema_compatible` check the serial
path uses, and streams chunks via COPY. Workers report per-chunk row
counts to ``progress_queue`` so the caller can drive a single aggregated
tqdm bar regardless of how many workers are in flight.
``db_overrides`` carries ``{"user", "password"}`` into workers when the
caller prompted for credentials interactively; leave ``None`` to let
@ -806,48 +807,56 @@ def load_cluster(
)
total = 0
total += _stream_file(
conn, schemaname, cluster.tablename, first, first_columns,
cluster.include, cluster.exclude,
total_rows=first_total_rows,
progress_queue=progress_queue,
)
# Commit the first file (and the CREATE TABLE) before spawning workers
# so their ``assert_schema_compatible`` probes actually see the new
# table. Without this, worker connections started mid-transaction on
# the main connection would see nothing in information_schema.
conn.commit()
if rest:
if workers > 1:
total += _load_remaining_files_parallel(
rest,
schemaname,
cluster.tablename,
cluster.include,
cluster.exclude,
workers=workers,
progress_queue=progress_queue,
db_overrides=db_overrides,
if workers > 1:
# Parallel path: commit the (empty) table now so worker subprocesses'
# ``assert_schema_compatible`` probes can actually see it via
# ``information_schema``, then dispatch *every* file (first +
# rest) to the pool. The previous design streamed the first file
# on the main connection before spawning workers, which made the
# serial first-file phase the long pole on big-file clusters
# (e.g. 52 × 5-50 GB). Now ``CREATE TABLE`` is the only serial
# work and it takes milliseconds.
conn.commit()
total += _load_remaining_files_parallel(
cluster.files,
schemaname,
cluster.tablename,
cluster.include,
cluster.exclude,
workers=workers,
progress_queue=progress_queue,
db_overrides=db_overrides,
)
else:
# Serial path: stream the first file on the main connection, then
# iterate the rest. Worth keeping separate from the parallel path
# because spawning a single-worker pool just to load files in
# series would be pure overhead.
total += _stream_file(
conn, schemaname, cluster.tablename, first, first_columns,
cluster.include, cluster.exclude,
total_rows=first_total_rows,
progress_queue=progress_queue,
)
conn.commit()
for path in rest:
columns, path_total_rows = _infer_cluster_schema(
path, cluster.include, cluster.exclude
)
# Uses the same check that if_exists=append runs. A type
# mismatch or missing column aborts the cluster; because
# chunks commit as they load, earlier chunks in the
# cluster remain in the table.
assert_schema_compatible(
conn, schemaname, cluster.tablename, columns
)
total += _stream_file(
conn, schemaname, cluster.tablename, path, columns,
cluster.include, cluster.exclude,
total_rows=path_total_rows,
progress_queue=progress_queue,
)
else:
for path in rest:
columns, path_total_rows = _infer_cluster_schema(
path, cluster.include, cluster.exclude
)
# Uses the same check that if_exists=append runs. A type
# mismatch or missing column aborts the cluster; because
# chunks commit as they load, earlier chunks in the
# cluster remain in the table.
assert_schema_compatible(
conn, schemaname, cluster.tablename, columns
)
total += _stream_file(
conn, schemaname, cluster.tablename, path, columns,
cluster.include, cluster.exclude,
total_rows=path_total_rows,
progress_queue=progress_queue,
)
# -- Index support ------------------------------------------------------
if cluster.indexes:
@ -994,9 +1003,12 @@ def _load_remaining_files_parallel(
Each file is an independent unit of work submitted to
``ProcessPoolExecutor``. Workers infer schema, validate compatibility,
and stream via COPY just like the serial path. Failures are collected
and re-raised as a single ``RuntimeError`` at the end so that all
other workers' rows still count toward the committed total.
and stream via COPY just like the serial path. The table itself must
already exist (and be committed) before this is called - the worker
schema-compat probes read ``information_schema``, which won't see an
uncommitted ``CREATE TABLE``. Failures are collected and re-raised as
a single ``RuntimeError`` at the end so that all other workers' rows
still count toward the committed total.
"""
total = 0
errors: List[Tuple[str, str]] = []