diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index 5cdcd3b..5e39943 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -1071,6 +1071,17 @@ def _build_argparser() -> argparse.ArgumentParser: "PGUSER / PGPASSWORD from the environment or .env file." ), ) + p.add_argument( + "--no-prescan", + action="store_true", + help=( + "Skip the per-file metadata scan that populates the folder-wide " + "tqdm ETA. Useful when the folder is large (half-hour+ pre-scan) " + "or when you're iterating quickly on a failure. Without the " + "pre-scan the progress bar still shows rows loaded, rate, and " + "elapsed time - it just can't estimate remaining time." + ), + ) p.add_argument( "--workers", type=int, @@ -1234,55 +1245,70 @@ def main(argv: Optional[List[str]] = None) -> int: # of scattered subheader pages per file - sequentially that's minutes for # a 52-file folder. pyreadstat releases the GIL during I/O and C decoding, # so a ThreadPool gives near-linear scaling until the disk saturates. + # ``--no-prescan`` bypasses the scan entirely; the progress bar then runs + # without an ETA - useful when pre-scan itself is expensive (half hour+ + # on very large files) or when debugging iteratively. all_files: List[Path] = [p for c in loadable for p in c.files] - prescan_workers = min(16, max(1, len(all_files))) - print( - f"pre-scanning row counts for {len(all_files)} file(s) " - f"across {prescan_workers} thread(s)...", - file=sys.stderr, - ) + grand_total: Optional[int] = 0 - def _scan_one(p: Path) -> Tuple[Path, Optional[int], Optional[str]]: - try: - meta = read_sas_metadata(p) - n = getattr(meta, "number_rows", None) - return (p, int(n) if n is not None else None, None) - except Exception as e: - return (p, None, str(e)) - - grand_total = 0 - unknown_total_files: List[str] = [] - with ThreadPoolExecutor(max_workers=prescan_workers) as tpool: - prescan_bar = tqdm( - total=len(all_files), - unit="file", - desc=" prescanning", - file=sys.stderr, - dynamic_ncols=True, - ) - try: - for p, n, err in tpool.map(_scan_one, all_files): - prescan_bar.update(1) - if err is not None: - unknown_total_files.append(f"{p.name} ({err})") - elif n is None: - unknown_total_files.append(p.name) - else: - grand_total += n - finally: - prescan_bar.close() - - if unknown_total_files: + if args.no_prescan: + grand_total = None print( - f"[warn] could not read row count from " - f"{len(unknown_total_files)} file(s); progress bar ETA will " - f"be approximate.", + f"[info] --no-prescan set: skipping row-count pre-scan for " + f"{len(all_files)} file(s); progress bar will show rate + " + f"elapsed but no ETA.", file=sys.stderr, ) - print( - f" total rows across folder: {grand_total:,}", - file=sys.stderr, - ) + else: + prescan_workers = min(16, max(1, len(all_files))) + print( + f"pre-scanning row counts for {len(all_files)} file(s) " + f"across {prescan_workers} thread(s)...", + file=sys.stderr, + ) + + def _scan_one(p: Path) -> Tuple[Path, Optional[int], Optional[str]]: + try: + meta = read_sas_metadata(p) + n = getattr(meta, "number_rows", None) + return (p, int(n) if n is not None else None, None) + except Exception as e: + return (p, None, str(e)) + + unknown_total_files: List[str] = [] + running_total = 0 + with ThreadPoolExecutor(max_workers=prescan_workers) as tpool: + prescan_bar = tqdm( + total=len(all_files), + unit="file", + desc=" prescanning", + file=sys.stderr, + dynamic_ncols=True, + ) + try: + for p, n, err in tpool.map(_scan_one, all_files): + prescan_bar.update(1) + if err is not None: + unknown_total_files.append(f"{p.name} ({err})") + elif n is None: + unknown_total_files.append(p.name) + else: + running_total += n + finally: + prescan_bar.close() + + if unknown_total_files: + print( + f"[warn] could not read row count from " + f"{len(unknown_total_files)} file(s); progress bar ETA will " + f"be approximate.", + file=sys.stderr, + ) + print( + f" total rows across folder: {running_total:,}", + file=sys.stderr, + ) + grand_total = running_total # -- Shared progress plumbing --------------------------------------------- # The queue crosses process boundaries when workers > 1 (managed proxy)