diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 437d6e1..76b38d9 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -944,6 +944,10 @@ def _worker_load_append_file( from dotenv import load_dotenv as _load_dotenv + import ctypes + import ctypes.util + import gc + from load_sas import ( apply_column_filter as _apply_column_filter, assert_schema_compatible as _assert_schema_compatible, @@ -962,6 +966,9 @@ def _worker_load_append_file( preview_df = _apply_column_filter(preview_df, include, exclude) total_rows = getattr(meta, "number_rows", None) columns = _infer_schema(preview_df, meta, total_rows=total_rows) + # Drop the preview ASAP - on a 2M-row wide file it's hundreds of MB + # and we never need it again after schema inference. + del preview_df, meta user = db_overrides.get("user") if db_overrides else None password = db_overrides.get("password") if db_overrides else None @@ -986,6 +993,32 @@ def _worker_load_append_file( conn.close() except Exception as e: return (path_str, 0, f"{type(e).__name__}: {e}") + finally: + # Hand memory back to the OS before the worker is recycled (or before + # ``max_tasks_per_child`` rotates this process). Three layers, each + # of which independently retains memory across calls: + # + # 1. pyarrow's memory pool aggressively reuses buffers - explicitly + # release_unused() returns them to the allocator. + # 2. Python's GC: cyclic refs from pandas/pyarrow chains aren't + # collected until a generation tick; force one now. + # 3. glibc's ptmalloc keeps freed heap in per-thread arenas instead + # of munmap'ing it back. ``malloc_trim(0)`` is the explicit ask. + # No-op (silently) on platforms without the symbol (macOS, etc). + try: + import pyarrow as _pa + _pa.default_memory_pool().release_unused() + except Exception: + pass + gc.collect() + try: + _libc_name = ctypes.util.find_library("c") + if _libc_name: + _libc = ctypes.CDLL(_libc_name) + if hasattr(_libc, "malloc_trim"): + _libc.malloc_trim(0) + except Exception: + pass def _load_remaining_files_parallel( @@ -1013,7 +1046,19 @@ def _load_remaining_files_parallel( total = 0 errors: List[Tuple[str, str]] = [] - with ProcessPoolExecutor(max_workers=workers) as pool: + # ``max_tasks_per_child=1`` recycles each worker process after every + # file. Without this, glibc/pyarrow/pyreadstat all retain peak-water + # memory inside long-lived workers; over a multi-hour run the sum + # across workers monotonically grows even though individual chunks + # have been freed at the Python level. Recycling per file gives the + # OS the memory back unconditionally - the only cost is one fork + + # python interpreter startup per file (~1-2 s), which is noise next + # to multi-GB sas7bdat reads. + pool_kwargs: Dict[str, Any] = {"max_workers": workers} + if sys.version_info >= (3, 11): + pool_kwargs["max_tasks_per_child"] = 1 + + with ProcessPoolExecutor(**pool_kwargs) as pool: futures = [ pool.submit( _worker_load_append_file, diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index d075b0d..13e59cb 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -1909,6 +1909,15 @@ def copy_dataframes( del buf conn.commit() total += n + # Hand pyarrow's pool memory back between chunks. Without this, + # arrow's internal buffer pool keeps the high-water bytes + # reserved across the worker's lifetime - inside long-running + # workers this presents as steadily climbing RSS even with the + # ``del``s above. Cheap (microseconds); call it every chunk. + try: + pa.default_memory_pool().release_unused() + except Exception: + pass return total