advanced_analyzer #8
@ -1148,7 +1148,15 @@ def _worker_load_append_file(
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
return (path_str, 0, f"{type(e).__name__}: {e}")
|
||||
import traceback as _traceback
|
||||
tb = _traceback.format_exc()
|
||||
# Keep the one-line summary (what the tqdm [FAIL] print uses) but
|
||||
# tack on the full traceback so the final cluster-failure block
|
||||
# shows the file/line that crashed. Without this, ``ProcessPool``
|
||||
# workers lose every frame of context - you get "FloatingPointError:
|
||||
# overflow encountered in multiply" with no hint of where inside
|
||||
# the pandas/numpy/pyarrow stack it happened.
|
||||
return (path_str, 0, f"{type(e).__name__}: {e}\n{tb}")
|
||||
finally:
|
||||
# Hand memory back to the OS before the worker is recycled (or before
|
||||
# ``max_tasks_per_child`` rotates this process). Three layers, each
|
||||
|
||||
@ -254,6 +254,22 @@ from tqdm import tqdm
|
||||
# at import time - narrow category match so nothing else is suppressed.
|
||||
warnings.filterwarnings("ignore", category=PerformanceWarning)
|
||||
|
||||
# Turn numpy's "raise on float overflow" (and friends) into silent inf/nan
|
||||
# production, module-wide. Pandas ships with ``np.errstate(over="raise")``
|
||||
# wrapped around several internal ops (most painfully, the multiply inside
|
||||
# ``pd.to_datetime(unit="s")`` that converts SAS epoch -> nanoseconds).
|
||||
# Our data routinely carries ``inf`` / huge sentinels, which trip that
|
||||
# ``raise`` and blow up an entire worker before ``errors="coerce"`` gets
|
||||
# a chance to turn them into NaT. Even with ``_safe_numeric_to_datetime``
|
||||
# pre-masking the obvious cases, other code paths (pandas object-dtype
|
||||
# datetime parsing, pyarrow type promotion, pyreadstat) can also trigger.
|
||||
# Setting a process-wide ``seterr`` is a heavier hammer than an
|
||||
# ``errstate`` block but survives library internals that don't explicitly
|
||||
# rewrap it. Downside: a real overflow bug in new code would now silently
|
||||
# produce inf/nan instead of raising - acceptable for a bulk loader where
|
||||
# "don't crash on bad rows, null them and move on" is the whole point.
|
||||
np.seterr(over="ignore", invalid="ignore", divide="ignore")
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -1976,15 +1992,16 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
|
||||
return dt.time(h, m, s)
|
||||
|
||||
|
||||
# Safe outer bound (in seconds) for the numeric->datetime conversion below.
|
||||
# Picked so that ``value * 1e9`` (the multiply pandas does internally to get
|
||||
# nanoseconds) stays well inside float64 range *and* the resulting timestamp
|
||||
# stays inside the datetime64[ns] window (~1677-2262). 1e13 seconds is roughly
|
||||
# year 318888 -- absurdly far past anything a real SAS file would carry, but
|
||||
# small enough that ``1e13 * 1e9 = 1e22`` is a comfortable ~286 orders of
|
||||
# magnitude under the float64 ceiling, so the multiply can't overflow.
|
||||
_SAS_DATETIME_SAFE_S = 1e13
|
||||
_SAS_DATETIME_SAFE_D = _SAS_DATETIME_SAFE_S / 86400.0
|
||||
# Safe outer bound for the numeric->datetime conversion below. The true
|
||||
# ceiling is ``pd.Timestamp.max`` (2262-04-11), which in seconds since 1960
|
||||
# is ~9.52e9. We pick a much tighter bound - year ~2200, ~7.6e9 seconds,
|
||||
# ~87600 days - because (a) any real SAS data past ~2100 is garbage anyway,
|
||||
# and (b) staying well inside the float64 + datetime64[ns] windows gives
|
||||
# pandas' internals zero room to trip the ``over="raise"`` they wrap
|
||||
# around the ns-multiply. ``7.5e9 * 1e9 = 7.5e18``, comfortably under both
|
||||
# ``int64.max`` (~9.22e18) and float64 overflow (~1.8e308).
|
||||
_SAS_DATETIME_SAFE_S = 7_500_000_000
|
||||
_SAS_DATETIME_SAFE_D = 87_000
|
||||
|
||||
|
||||
def _safe_numeric_to_datetime(
|
||||
@ -1997,35 +2014,57 @@ def _safe_numeric_to_datetime(
|
||||
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
|
||||
one stray cell take down the worker.
|
||||
|
||||
Two failure modes we've hit in production:
|
||||
Failure modes seen in production:
|
||||
|
||||
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
|
||||
sentinels, divide-by-zero in the source, uninitialized cells).
|
||||
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
|
||||
overflows float64.
|
||||
* Values between ``pd.Timestamp.max`` and float64 safety (~9.5e9 to 1e308
|
||||
seconds) where the nanosecond multiply silently produces garbage or
|
||||
overflows int64.
|
||||
|
||||
Both cases trigger ``FloatingPointError: overflow encountered in multiply``
|
||||
All of these trigger ``FloatingPointError: overflow encountered in multiply``
|
||||
inside ``pd.to_datetime`` because pandas wraps the multiply in
|
||||
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never gets
|
||||
a chance to turn the bad value into ``NaT``.
|
||||
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never
|
||||
gets a chance to turn the bad value into ``NaT``.
|
||||
|
||||
Strategy: mask non-finite and out-of-range values to NaN *before* calling
|
||||
``to_datetime``, then run the conversion under a permissive ``errstate``
|
||||
as a belt-and-suspenders. Emit one stderr line per chunk per affected
|
||||
column so silent data loss doesn't sneak by.
|
||||
Strategy, belt + suspenders + airbag:
|
||||
|
||||
1. Coerce to float64 up front. Object-dtype branches hand us mixed
|
||||
int/float/str; ``pd.to_numeric(errors="coerce")`` parses what it can
|
||||
and NaNs the rest, so we hit the rest of this function with a
|
||||
pristine float series.
|
||||
2. Mask non-finite values and anything outside the safe epoch window to
|
||||
NaN *before* ``pd.to_datetime`` sees them.
|
||||
3. Run the conversion under a permissive ``errstate``.
|
||||
4. If that still raises (some pandas version internally re-enables
|
||||
``over="raise"`` in a way ``errstate`` can't override), catch it
|
||||
and return all-NaT for the column with a loud warning. Better a
|
||||
NULL column in one chunk than a dead worker + no diagnostics.
|
||||
|
||||
Emits one stderr line per chunk per affected column so silent data
|
||||
loss doesn't sneak by.
|
||||
"""
|
||||
finite_mask = np.isfinite(series.to_numpy(dtype="float64", na_value=np.nan))
|
||||
if not pd.api.types.is_float_dtype(series):
|
||||
series = pd.to_numeric(series, errors="coerce").astype("float64")
|
||||
|
||||
arr = series.to_numpy(dtype="float64", copy=False, na_value=np.nan)
|
||||
if unit == "s":
|
||||
bound = _SAS_DATETIME_SAFE_S
|
||||
elif unit == "D":
|
||||
bound = _SAS_DATETIME_SAFE_D
|
||||
else:
|
||||
bound = _SAS_DATETIME_SAFE_S
|
||||
in_range_mask = series.abs() < bound
|
||||
keep_mask = finite_mask & in_range_mask.fillna(False).to_numpy()
|
||||
# Count cells we *would* drop that weren't already NaN, so we don't double-
|
||||
# report rows that were missing in the source file.
|
||||
was_present = ~series.isna().to_numpy()
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
finite_mask = np.isfinite(arr)
|
||||
# ``np.abs(inf) -> inf``, ``np.abs(nan) -> nan``; both compare False
|
||||
# to ``bound``, so ``in_range_mask`` already excludes non-finite
|
||||
# values. The explicit ``finite_mask &`` below is belt-and-suspenders
|
||||
# in case a future numpy changes that semantic.
|
||||
in_range_mask = np.abs(arr) < bound
|
||||
keep_mask = finite_mask & in_range_mask
|
||||
was_present = ~np.isnan(arr)
|
||||
coerced = int(((~keep_mask) & was_present).sum())
|
||||
if coerced:
|
||||
tqdm.write(
|
||||
@ -2034,11 +2073,56 @@ def _safe_numeric_to_datetime(
|
||||
f"coerced to NULL",
|
||||
file=sys.stderr,
|
||||
)
|
||||
cleaned = series.where(keep_mask, other=np.nan)
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(
|
||||
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
|
||||
cleaned_arr = np.where(keep_mask, arr, np.nan)
|
||||
cleaned = pd.Series(cleaned_arr, index=series.index)
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(
|
||||
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
|
||||
)
|
||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
||||
tqdm.write(
|
||||
f"[error] {target_type} column {column_name!r}: "
|
||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
||||
f"returning NaT for the entire chunk. This usually means one "
|
||||
f"or more values slipped past the pre-mask (bound={bound}). "
|
||||
f"Consider setting the column to TEXT via column_types if this "
|
||||
f"recurs.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
||||
|
||||
|
||||
def _safe_object_to_datetime(
|
||||
series: pd.Series,
|
||||
*,
|
||||
column_name: str,
|
||||
target_type: str,
|
||||
) -> pd.Series:
|
||||
"""Object-dtype to datetime. Shares the safety net (errstate +
|
||||
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
|
||||
actually numeric-flavored (e.g. SAS wrote numbers into an object
|
||||
column), route to the numeric path; otherwise parse with ``to_datetime``
|
||||
on the object itself.
|
||||
"""
|
||||
coerced = series.replace({"": None})
|
||||
numeric = pd.to_numeric(coerced, errors="coerce")
|
||||
all_numeric = numeric.notna().sum() == coerced.notna().sum()
|
||||
if all_numeric and coerced.notna().any():
|
||||
return _safe_numeric_to_datetime(
|
||||
numeric, unit="s", column_name=column_name, target_type=target_type,
|
||||
)
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(coerced, errors="coerce")
|
||||
except (FloatingPointError, OverflowError, ValueError) as exc:
|
||||
tqdm.write(
|
||||
f"[error] {target_type} column {column_name!r}: "
|
||||
f"pd.to_datetime raised {type(exc).__name__}: {exc}; "
|
||||
f"returning NaT for the entire chunk.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return pd.Series(pd.NaT, index=series.index, dtype="datetime64[ns]")
|
||||
|
||||
|
||||
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
|
||||
@ -2077,9 +2161,14 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
elif pd.api.types.is_object_dtype(series):
|
||||
# Vectorized parse: empty strings / None / unparseable -> NaT,
|
||||
# then .dt.date yields date objects or NaT. NaT serializes as
|
||||
# an empty CSV field (matching ``NULL ''`` in COPY).
|
||||
parsed = pd.to_datetime(
|
||||
series.replace({"": None}), errors="coerce"
|
||||
# an empty CSV field (matching ``NULL ''`` in COPY). Routed
|
||||
# through ``_safe_object_to_datetime`` so an object column
|
||||
# that actually contains SAS-epoch numerics (seen when one
|
||||
# file of a cluster stores the column as NUM and another as
|
||||
# CHAR + the union flipped it to TEXT-then-DATE) can't trip
|
||||
# the overflow-in-multiply bug.
|
||||
parsed = _safe_object_to_datetime(
|
||||
series, column_name=name, target_type="DATE",
|
||||
)
|
||||
out[name] = parsed.dt.date
|
||||
elif pd.api.types.is_numeric_dtype(series):
|
||||
@ -2099,8 +2188,11 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
if pd.api.types.is_datetime64_any_dtype(series):
|
||||
out[name] = series
|
||||
elif pd.api.types.is_object_dtype(series):
|
||||
out[name] = pd.to_datetime(
|
||||
series.replace({"": None}), errors="coerce"
|
||||
# Same rationale as the DATE object branch above: route
|
||||
# through the safety net so numeric-flavored object columns
|
||||
# can't blow us up during the ns multiply.
|
||||
out[name] = _safe_object_to_datetime(
|
||||
series, column_name=name, target_type="TIMESTAMP",
|
||||
)
|
||||
elif pd.api.types.is_numeric_dtype(series):
|
||||
# Same story as the DATE branch above, but SAS datetimes are
|
||||
|
||||
Loading…
Reference in New Issue
Block a user