From 052fb0e0870abc11e73f54bd92af73acb73e325d Mon Sep 17 00:00:00 2001 From: David Peterson Date: Mon, 20 Apr 2026 22:43:02 -0500 Subject: [PATCH] Refactor pre-scan process in load_folder.py to utilize ThreadPoolExecutor for improved performance Updated the main function to replace sequential file processing with a threaded approach using ThreadPoolExecutor. This change enhances the efficiency of reading row counts from SAS files, particularly for large datasets, by allowing concurrent I/O operations. Added progress tracking with tqdm for better user feedback during the pre-scan phase. --- generic_loader/load_folder.py | 49 +++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 0593c46..5cdcd3b 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -138,7 +138,7 @@ import queue as _queue_mod import re import sys import threading -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -1230,25 +1230,48 @@ def main(argv: Optional[List[str]] = None) -> int: # -- Metadata pre-scan ----------------------------------------------------- # Sum ``number_rows`` across every file so the tqdm bar has a real # denominator. ``read_sas_metadata`` uses pyreadstat's ``metadataonly=True`` - # fast path; a few ms per sas7bdat even on large files. + # fast path, but on multi-GB sas7bdat files that still reads tens of MB + # of scattered subheader pages per file - sequentially that's minutes for + # a 52-file folder. pyreadstat releases the GIL during I/O and C decoding, + # so a ThreadPool gives near-linear scaling until the disk saturates. + all_files: List[Path] = [p for c in loadable for p in c.files] + prescan_workers = min(16, max(1, len(all_files))) print( - f"pre-scanning row counts for {sum(len(c.files) for c in loadable)} " - f"file(s)...", + f"pre-scanning row counts for {len(all_files)} file(s) " + f"across {prescan_workers} thread(s)...", file=sys.stderr, ) + + def _scan_one(p: Path) -> Tuple[Path, Optional[int], Optional[str]]: + try: + meta = read_sas_metadata(p) + n = getattr(meta, "number_rows", None) + return (p, int(n) if n is not None else None, None) + except Exception as e: + return (p, None, str(e)) + grand_total = 0 unknown_total_files: List[str] = [] - for c in loadable: - for p in c.files: - try: - meta = read_sas_metadata(p) - n = getattr(meta, "number_rows", None) - if n is None: + with ThreadPoolExecutor(max_workers=prescan_workers) as tpool: + prescan_bar = tqdm( + total=len(all_files), + unit="file", + desc=" prescanning", + file=sys.stderr, + dynamic_ncols=True, + ) + try: + for p, n, err in tpool.map(_scan_one, all_files): + prescan_bar.update(1) + if err is not None: + unknown_total_files.append(f"{p.name} ({err})") + elif n is None: unknown_total_files.append(p.name) else: - grand_total += int(n) - except Exception as e: - unknown_total_files.append(f"{p.name} ({e})") + grand_total += n + finally: + prescan_bar.close() + if unknown_total_files: print( f"[warn] could not read row count from "