diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 5e39943..40489cc 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -728,15 +728,16 @@ def load_cluster( files in the cluster - stay committed; only the in-flight chunk is rolled back by :func:`main`. - ``workers`` controls parallelism for the *append* phase. The first file - always runs serially on ``conn`` (to create the table and, when - partitioned, pre-create partitions). When ``workers > 1`` the remaining - files dispatch to a ``ProcessPoolExecutor``; each worker opens its own - psycopg2 connection, re-infers the per-file schema, runs the same - :func:`load_sas.assert_schema_compatible` check the serial path uses, - and streams chunks via COPY. Workers report per-chunk row counts to - ``progress_queue`` so the caller can drive a single aggregated tqdm - bar regardless of how many workers are in flight. + ``workers`` controls parallelism for streaming. With ``workers == 1`` + every file streams on ``conn`` in sequence. With ``workers > 1`` the + main connection only does ``CREATE TABLE`` (and, for partitioned + clusters, partition discovery + pre-creation), commits, then dispatches + *every* file - including the first - to a ``ProcessPoolExecutor``. Each + worker opens its own psycopg2 connection, re-infers the per-file schema, + runs the same :func:`load_sas.assert_schema_compatible` check the serial + path uses, and streams chunks via COPY. Workers report per-chunk row + counts to ``progress_queue`` so the caller can drive a single aggregated + tqdm bar regardless of how many workers are in flight. ``db_overrides`` carries ``{"user", "password"}`` into workers when the caller prompted for credentials interactively; leave ``None`` to let @@ -806,48 +807,56 @@ def load_cluster( ) total = 0 - total += _stream_file( - conn, schemaname, cluster.tablename, first, first_columns, - cluster.include, cluster.exclude, - total_rows=first_total_rows, - progress_queue=progress_queue, - ) - # Commit the first file (and the CREATE TABLE) before spawning workers - # so their ``assert_schema_compatible`` probes actually see the new - # table. Without this, worker connections started mid-transaction on - # the main connection would see nothing in information_schema. - conn.commit() - if rest: - if workers > 1: - total += _load_remaining_files_parallel( - rest, - schemaname, - cluster.tablename, - cluster.include, - cluster.exclude, - workers=workers, - progress_queue=progress_queue, - db_overrides=db_overrides, + if workers > 1: + # Parallel path: commit the (empty) table now so worker subprocesses' + # ``assert_schema_compatible`` probes can actually see it via + # ``information_schema``, then dispatch *every* file (first + + # rest) to the pool. The previous design streamed the first file + # on the main connection before spawning workers, which made the + # serial first-file phase the long pole on big-file clusters + # (e.g. 52 × 5-50 GB). Now ``CREATE TABLE`` is the only serial + # work and it takes milliseconds. + conn.commit() + total += _load_remaining_files_parallel( + cluster.files, + schemaname, + cluster.tablename, + cluster.include, + cluster.exclude, + workers=workers, + progress_queue=progress_queue, + db_overrides=db_overrides, + ) + else: + # Serial path: stream the first file on the main connection, then + # iterate the rest. Worth keeping separate from the parallel path + # because spawning a single-worker pool just to load files in + # series would be pure overhead. + total += _stream_file( + conn, schemaname, cluster.tablename, first, first_columns, + cluster.include, cluster.exclude, + total_rows=first_total_rows, + progress_queue=progress_queue, + ) + conn.commit() + for path in rest: + columns, path_total_rows = _infer_cluster_schema( + path, cluster.include, cluster.exclude + ) + # Uses the same check that if_exists=append runs. A type + # mismatch or missing column aborts the cluster; because + # chunks commit as they load, earlier chunks in the + # cluster remain in the table. + assert_schema_compatible( + conn, schemaname, cluster.tablename, columns + ) + total += _stream_file( + conn, schemaname, cluster.tablename, path, columns, + cluster.include, cluster.exclude, + total_rows=path_total_rows, + progress_queue=progress_queue, ) - else: - for path in rest: - columns, path_total_rows = _infer_cluster_schema( - path, cluster.include, cluster.exclude - ) - # Uses the same check that if_exists=append runs. A type - # mismatch or missing column aborts the cluster; because - # chunks commit as they load, earlier chunks in the - # cluster remain in the table. - assert_schema_compatible( - conn, schemaname, cluster.tablename, columns - ) - total += _stream_file( - conn, schemaname, cluster.tablename, path, columns, - cluster.include, cluster.exclude, - total_rows=path_total_rows, - progress_queue=progress_queue, - ) # -- Index support ------------------------------------------------------ if cluster.indexes: @@ -994,9 +1003,12 @@ def _load_remaining_files_parallel( Each file is an independent unit of work submitted to ``ProcessPoolExecutor``. Workers infer schema, validate compatibility, - and stream via COPY just like the serial path. Failures are collected - and re-raised as a single ``RuntimeError`` at the end so that all - other workers' rows still count toward the committed total. + and stream via COPY just like the serial path. The table itself must + already exist (and be committed) before this is called - the worker + schema-compat probes read ``information_schema``, which won't see an + uncommitted ``CREATE TABLE``. Failures are collected and re-raised as + a single ``RuntimeError`` at the end so that all other workers' rows + still count toward the committed total. """ total = 0 errors: List[Tuple[str, str]] = []