diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index e9e769e..d503276 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -1148,7 +1148,15 @@ def _worker_load_append_file( finally: conn.close() except Exception as e: - return (path_str, 0, f"{type(e).__name__}: {e}") + import traceback as _traceback + tb = _traceback.format_exc() + # Keep the one-line summary (what the tqdm [FAIL] print uses) but + # tack on the full traceback so the final cluster-failure block + # shows the file/line that crashed. Without this, ``ProcessPool`` + # workers lose every frame of context - you get "FloatingPointError: + # overflow encountered in multiply" with no hint of where inside + # the pandas/numpy/pyarrow stack it happened. + return (path_str, 0, f"{type(e).__name__}: {e}\n{tb}") finally: # Hand memory back to the OS before the worker is recycled (or before # ``max_tasks_per_child`` rotates this process). Three layers, each diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 68db570..8db1d5c 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -254,6 +254,22 @@ from tqdm import tqdm # at import time - narrow category match so nothing else is suppressed. warnings.filterwarnings("ignore", category=PerformanceWarning) +# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan +# production, module-wide. Pandas ships with ``np.errstate(over="raise")`` +# wrapped around several internal ops (most painfully, the multiply inside +# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds). +# Our data routinely carries ``inf`` / huge sentinels, which trip that +# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets +# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime`` +# pre-masking the obvious cases, other code paths (pandas object-dtype +# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger. +# Setting a process-wide ``seterr`` is a heavier hammer than an +# ``errstate`` block but survives library internals that don't explicitly +# rewrap it. Downside: a real overflow bug in new code would now silently +# produce inf/nan instead of raising - acceptable for a bulk loader where +# "don't crash on bad rows, null them and move on" is the whole point. +np.seterr(over="ignore", invalid="ignore", divide="ignore") + logger = logging.getLogger(__name__) @@ -1976,15 +1992,16 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]: return dt.time(h, m, s) -# Safe outer bound (in seconds) for the numeric->datetime conversion below. -# Picked so that ``value * 1e9`` (the multiply pandas does internally to get -# nanoseconds) stays well inside float64 range *and* the resulting timestamp -# stays inside the datetime64[ns] window (~1677-2262). 1e13 seconds is roughly -# year 318888 -- absurdly far past anything a real SAS file would carry, but -# small enough that ``1e13 * 1e9 = 1e22`` is a comfortable ~286 orders of -# magnitude under the float64 ceiling, so the multiply can't overflow. -_SAS_DATETIME_SAFE_S = 1e13 -_SAS_DATETIME_SAFE_D = _SAS_DATETIME_SAFE_S / 86400.0 +# Safe outer bound for the numeric->datetime conversion below. The true +# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960 +# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds, +# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway, +# and (b) staying well inside the float64 + datetime64[ns] windows gives +# pandas' internals zero room to trip the ``over="raise"`` they wrap +# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both +# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308). +_SAS_DATETIME_SAFE_S = 7_500_000_000 +_SAS_DATETIME_SAFE_D = 87_000 def _safe_numeric_to_datetime( @@ -1997,35 +2014,57 @@ def _safe_numeric_to_datetime( """Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting one stray cell take down the worker. - Two failure modes we've hit in production: + Failure modes seen in production: * ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value sentinels, divide-by-zero in the source, uninitialized cells). * Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9`` overflows float64. + * Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308 + seconds) where the nanosecond multiply silently produces garbage or + overflows int64. - Both cases trigger ``FloatingPointError: overflow encountered in multiply`` + All of these trigger ``FloatingPointError: overflow encountered in multiply`` inside ``pd.to_datetime`` because pandas wraps the multiply in - ``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never gets - a chance to turn the bad value into ``NaT``. + ``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never + gets a chance to turn the bad value into ``NaT``. - Strategy: mask non-finite and out-of-range values to NaN *before* calling - ``to_datetime``, then run the conversion under a permissive ``errstate`` - as a belt-and-suspenders. Emit one stderr line per chunk per affected - column so silent data loss doesn't sneak by. + Strategy, belt + suspenders + airbag: + + 1. Coerce to float64 up front. Object-dtype branches hand us mixed + int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can + and NaNs the rest, so we hit the rest of this function with a + pristine float series. + 2. Mask non-finite values and anything outside the safe epoch window to + NaN *before* ``pd.to_datetime`` sees them. + 3. Run the conversion under a permissive ``errstate``. + 4. If that still raises (some pandas version internally re-enables + ``over="raise"`` in a way ``errstate`` can't override), catch it + and return all-NaT for the column with a loud warning. Better a + NULL column in one chunk than a dead worker + no diagnostics. + + Emits one stderr line per chunk per affected column so silent data + loss doesn't sneak by. """ - finite_mask = np.isfinite(series.to_numpy(dtype="float64", na_value=np.nan)) + if not pd.api.types.is_float_dtype(series): + series = pd.to_numeric(series, errors="coerce").astype("float64") + + arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan) if unit == "s": bound = _SAS_DATETIME_SAFE_S elif unit == "D": bound = _SAS_DATETIME_SAFE_D else: bound = _SAS_DATETIME_SAFE_S - in_range_mask = series.abs() < bound - keep_mask = finite_mask & in_range_mask.fillna(False).to_numpy() - # Count cells we *would* drop that weren't already NaN, so we don't double- - # report rows that were missing in the source file. - was_present = ~series.isna().to_numpy() + with np.errstate(over="ignore", invalid="ignore", divide="ignore"): + finite_mask = np.isfinite(arr) + # ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False + # to ``bound``, so ``in_range_mask`` already excludes non-finite + # values. The explicit ``finite_mask &`` below is belt-and-suspenders + # in case a future numpy changes that semantic. + in_range_mask = np.abs(arr) < bound + keep_mask = finite_mask & in_range_mask + was_present = ~np.isnan(arr) coerced = int(((~keep_mask) & was_present).sum()) if coerced: tqdm.write( @@ -2034,11 +2073,56 @@ def _safe_numeric_to_datetime( f"coerced to NULL", file=sys.stderr, ) - cleaned = series.where(keep_mask, other=np.nan) - with np.errstate(over="ignore", invalid="ignore", divide="ignore"): - return pd.to_datetime( - cleaned, unit=unit, origin="1960-01-01", errors="coerce", + cleaned_arr = np.where(keep_mask, arr, np.nan) + cleaned = pd.Series(cleaned_arr, index=series.index) + try: + with np.errstate(over="ignore", invalid="ignore", divide="ignore"): + return pd.to_datetime( + cleaned, unit=unit, origin="1960-01-01", errors="coerce", + ) + except (FloatingPointError, OverflowError, ValueError) as exc: + tqdm.write( + f"[error] {target_type} column {column_name!r}: " + f"pd.to_datetime raised {type(exc).__name__}: {exc}; " + f"returning NaT for the entire chunk. This usually means one " + f"or more values slipped past the pre-mask (bound={bound}). " + f"Consider setting the column to TEXT via column_types if this " + f"recurs.", + file=sys.stderr, ) + return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]") + + +def _safe_object_to_datetime( + series: pd.Series, + *, + column_name: str, + target_type: str, +) -> pd.Series: + """Object-dtype to datetime. Shares the safety net (errstate + + try/except) with :func:`_safe_numeric_to_datetime`. If the column is + actually numeric-flavored (e.g. SAS wrote numbers into an object + column), route to the numeric path; otherwise parse with ``to_datetime`` + on the object itself. + """ + coerced = series.replace({"": None}) + numeric = pd.to_numeric(coerced, errors="coerce") + all_numeric = numeric.notna().sum() == coerced.notna().sum() + if all_numeric and coerced.notna().any(): + return _safe_numeric_to_datetime( + numeric, unit="s", column_name=column_name, target_type=target_type, + ) + try: + with np.errstate(over="ignore", invalid="ignore", divide="ignore"): + return pd.to_datetime(coerced, errors="coerce") + except (FloatingPointError, OverflowError, ValueError) as exc: + tqdm.write( + f"[error] {target_type} column {column_name!r}: " + f"pd.to_datetime raised {type(exc).__name__}: {exc}; " + f"returning NaT for the entire chunk.", + file=sys.stderr, + ) + return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]") def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame: @@ -2077,9 +2161,14 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da elif pd.api.types.is_object_dtype(series): # Vectorized parse: empty strings / None / unparseable -> NaT, # then .dt.date yields date objects or NaT. NaT serializes as - # an empty CSV field (matching ``NULL ''`` in COPY). - parsed = pd.to_datetime( - series.replace({"": None}), errors="coerce" + # an empty CSV field (matching ``NULL ''`` in COPY). Routed + # through ``_safe_object_to_datetime`` so an object column + # that actually contains SAS-epoch numerics (seen when one + # file of a cluster stores the column as NUM and another as + # CHAR + the union flipped it to TEXT-then-DATE) can't trip + # the overflow-in-multiply bug. + parsed = _safe_object_to_datetime( + series, column_name=name, target_type="DATE", ) out[name] = parsed.dt.date elif pd.api.types.is_numeric_dtype(series): @@ -2099,8 +2188,11 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da if pd.api.types.is_datetime64_any_dtype(series): out[name] = series elif pd.api.types.is_object_dtype(series): - out[name] = pd.to_datetime( - series.replace({"": None}), errors="coerce" + # Same rationale as the DATE object branch above: route + # through the safety net so numeric-flavored object columns + # can't blow us up during the ns multiply. + out[name] = _safe_object_to_datetime( + series, column_name=name, target_type="TIMESTAMP", ) elif pd.api.types.is_numeric_dtype(series): # Same story as the DATE branch above, but SAS datetimes are