advanced_analyzer #8

Merged
dp merged 23 commits from advanced_analyzer into main 2026-04-21 22:32:18 +00:00
Showing only changes of commit c283b42876 - Show all commits

View File

@ -232,6 +232,7 @@ from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd import pandas as pd
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
@ -1944,6 +1945,71 @@ def _seconds_to_time(v: Any) -> Optional[dt.time]:
return dt.time(h, m, s) return dt.time(h, m, s)
# Safe outer bound (in seconds) for the numeric->datetime conversion below.
# Picked so that ``value * 1e9`` (the multiply pandas does internally to get
# nanoseconds) stays well inside float64 range *and* the resulting timestamp
# stays inside the datetime64[ns] window (~1677-2262). 1e13 seconds is roughly
# year 318888 -- absurdly far past anything a real SAS file would carry, but
# small enough that ``1e13 * 1e9 = 1e22`` is a comfortable ~286 orders of
# magnitude under the float64 ceiling, so the multiply can't overflow.
_SAS_DATETIME_SAFE_S = 1e13
_SAS_DATETIME_SAFE_D = _SAS_DATETIME_SAFE_S / 86400.0
def _safe_numeric_to_datetime(
series: pd.Series,
*,
unit: str,
column_name: str,
target_type: str,
) -> pd.Series:
"""Convert a numeric SAS-epoch series to ``datetime64[ns]`` without letting
one stray cell take down the worker.
Two failure modes we've hit in production:
* ``np.inf`` / ``-np.inf`` slipping through pyreadstat (SAS missing-value
sentinels, divide-by-zero in the source, uninitialized cells).
* Absurdly large finite floats (e.g. ``1.7e308``) where ``value * 1e9``
overflows float64.
Both cases trigger ``FloatingPointError: overflow encountered in multiply``
inside ``pd.to_datetime`` because pandas wraps the multiply in
``np.errstate(over="raise")`` -- our outer ``errors="coerce"`` never gets
a chance to turn the bad value into ``NaT``.
Strategy: mask non-finite and out-of-range values to NaN *before* calling
``to_datetime``, then run the conversion under a permissive ``errstate``
as a belt-and-suspenders. Emit one stderr line per chunk per affected
column so silent data loss doesn't sneak by.
"""
finite_mask = np.isfinite(series.to_numpy(dtype="float64", na_value=np.nan))
if unit == "s":
bound = _SAS_DATETIME_SAFE_S
elif unit == "D":
bound = _SAS_DATETIME_SAFE_D
else:
bound = _SAS_DATETIME_SAFE_S
in_range_mask = series.abs() < bound
keep_mask = finite_mask & in_range_mask.fillna(False).to_numpy()
# Count cells we *would* drop that weren't already NaN, so we don't double-
# report rows that were missing in the source file.
was_present = ~series.isna().to_numpy()
coerced = int(((~keep_mask) & was_present).sum())
if coerced:
tqdm.write(
f"[warn] {target_type} column {column_name!r}: {coerced:,} "
f"row(s) had non-representable values (Inf/NaN/out-of-range), "
f"coerced to NULL",
file=sys.stderr,
)
cleaned = series.where(keep_mask, other=np.nan)
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(
cleaned, unit=unit, origin="1960-01-01", errors="coerce",
)
def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame: def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.DataFrame:
"""Materialize a copy of ``df`` with each column in the right shape for """Materialize a copy of ``df`` with each column in the right shape for
``to_csv`` so the CSV lands as valid input for the target Postgres type. ``to_csv`` so the CSV lands as valid input for the target Postgres type.
@ -1992,8 +2058,8 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
# 1960-01-01, the SAS epoch. Without this branch the raw # 1960-01-01, the SAS epoch. Without this branch the raw
# number would hit COPY and Postgres rejects it with # number would hit COPY and Postgres rejects it with
# ``invalid input syntax for type date``. # ``invalid input syntax for type date``.
parsed = pd.to_datetime( parsed = _safe_numeric_to_datetime(
series, unit="D", origin="1960-01-01", errors="coerce", series, unit="D", column_name=name, target_type="DATE",
) )
out[name] = parsed.dt.date out[name] = parsed.dt.date
else: else:
@ -2010,8 +2076,8 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
# *seconds* since 1960-01-01 (fractional seconds for # *seconds* since 1960-01-01 (fractional seconds for
# ``DATETIMEw.d``). Example caught in the wild: # ``DATETIMEw.d``). Example caught in the wild:
# ``1915465463.615`` -> 2020-09-13 05:44:23.615. # ``1915465463.615`` -> 2020-09-13 05:44:23.615.
out[name] = pd.to_datetime( out[name] = _safe_numeric_to_datetime(
series, unit="s", origin="1960-01-01", errors="coerce", series, unit="s", column_name=name, target_type="TIMESTAMP",
) )
else: else:
out[name] = series out[name] = series