From e39eb47a909967e7ba5757346876e27db0201b41 Mon Sep 17 00:00:00 2001 From: michael-corey Date: Mon, 20 Apr 2026 08:38:38 -0500 Subject: [PATCH] altering such that commit is by batch --- generic_loader/load_folder.py | 11 ++++++----- generic_loader/load_sas.py | 13 ++++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 9301786..5b8d232 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -385,9 +385,10 @@ def _infer_cluster_schema(path: Path, include, exclude): def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int: """Load every file in ``cluster`` into one table. Returns total rows loaded. - The caller owns transaction boundaries. This function does NOT commit or - roll back - :func:`main` does that per cluster so one bad cluster - doesn't poison the rest of the run. + Commits happen per chunk inside :func:`load_sas.copy_dataframes`. If a + file mid-cluster fails, earlier chunks - including chunks from earlier + files in the cluster - stay committed; only the in-flight chunk is + rolled back by :func:`main`. """ if not cluster.files: return 0 @@ -407,8 +408,8 @@ def load_cluster(conn, cluster: ClusterSpec, schemaname: str) -> int: for path in rest: columns = _infer_cluster_schema(path, cluster.include, cluster.exclude) # Uses the same check that if_exists=append runs. A type mismatch or - # missing column aborts the cluster; the transaction rollback in - # main() keeps the table from ending up half-loaded. + # missing column aborts the cluster; because chunks commit as they + # load, earlier chunks in the cluster remain in the table. assert_schema_compatible(conn, schemaname, cluster.tablename, columns) total += _stream_file( conn, schemaname, cluster.tablename, path, columns, diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 83b4f28..851a1e7 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -194,8 +194,8 @@ will fail mid-stream and the whole transaction rolls back. Set matters more than speed. Streaming loads use :func:`iter_sas_chunks` + :func:`copy_dataframes`, which -share one cursor and transaction so a failure mid-file rolls back the whole -load. +commit each chunk as it is copied so an interrupted load retains the rows +that were already written. 7. Tunables ----------- @@ -1032,10 +1032,12 @@ def copy_dataframes( dfs: Iterable[pd.DataFrame], columns: Dict[str, ColumnSpec], ) -> int: - """Stream an iterable of DataFrames into one ``COPY`` session. + """Stream an iterable of DataFrames into Postgres, committing each chunk. - All chunks share a cursor and transaction, so a failure mid-stream - rolls back the whole load when the caller hasn't committed yet. + Each non-empty chunk is copied via ``COPY ... FROM STDIN`` and committed + before the next chunk is processed, so an interrupted or failed load + retains the rows from previously committed chunks. The first chunk's + commit also flushes any pending DDL (e.g. a preceding ``CREATE TABLE``). Empty chunks are skipped. Returns the total rows inserted. """ col_list = ", ".join(_quote_ident(name) for name in columns.keys()) @@ -1060,6 +1062,7 @@ def copy_dataframes( ) buf.seek(0) cur.copy_expert(sql, buf) + conn.commit() total += len(prepared) return total