Compare commits

..

6 Commits

Author SHA1 Message Date
David Peterson
c3d1f72556 Add null string sentinel handling in load_sas.py for improved missing value detection
Introduced a frozenset of string literals that represent SQL NULL values, enhancing the inference and nullability detection processes. Implemented helper functions to identify null strings and unify missing value checks for CHAR/TEXT columns. Updated the _null_sentinel_mask function to replace these sentinel values with None, ensuring consistent handling across various data types during data loading. This change improves robustness in managing missing data scenarios.
2026-04-22 19:20:07 -05:00
David Peterson
998a3e282f Revert "Optimize datetime parsing in load_sas.py by implementing a sample-based format detection approach"
This reverts commit 857f696305.
2026-04-22 13:05:11 -05:00
David Peterson
857f696305 Optimize datetime parsing in load_sas.py by implementing a sample-based format detection approach
Introduced a new mechanism to sample non-null values for determining the appropriate datetime parsing strategy, significantly reducing processing time for large datasets. This change replaces the previous full row-walk method with a more efficient sampling technique, enhancing performance while maintaining robust handling of various date formats. Updated comments for clarity on the new approach.
2026-04-22 12:54:19 -05:00
David Peterson
c3fa943e77 Enhance date and datetime parsing in load_sas.py with flexible regex and fallback formats
Introduced a locale-independent month lookup and improved date parsing functions to handle various date formats, including SAS and Oracle styles. The new _parse_flexible_date and _parse_flexible_datetime functions provide robust parsing capabilities, accommodating both date-only and datetime inputs. Updated _try_date_coerce and _try_datetime_coerce to utilize these new functions, ensuring better handling of diverse date formats during data loading.
2026-04-22 12:28:19 -05:00
michael-corey
f63d684d51 moving to env file 2026-04-22 15:37:35 +00:00
David Peterson
0632e110e5 Implement parallel processing for partition discovery in load_folder.py and enhance column filtering in load_sas.py
Added support for parallel processing using ProcessPoolExecutor in the _discover_cluster_partitions function, allowing for efficient partition value discovery across multiple files. This change significantly reduces I/O overhead by reading only necessary columns during scans. Additionally, updated iter_sas_chunks and iter_text_chunks functions to accept a usecols parameter, enabling selective column parsing for improved performance during data loading. These enhancements aim to optimize resource usage and speed up the data processing pipeline.
2026-04-22 15:35:19 +00:00
7 changed files with 566 additions and 79 deletions

3
.gitignore vendored
View File

@ -1,6 +1,7 @@
/.venv
/samples
/.env
.env
!.env.example
__pycache__/
venv/
*/__pycache__/

View File

@ -3,3 +3,6 @@ PGPORT=5432
PGUSER=
PGPASSWORD=
PGDATABASE=
S3_BUCKET=my-bucket
AWS_PROFILE=default

View File

@ -157,7 +157,6 @@ import threading
from concurrent.futures import (
CancelledError,
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
)
from dataclasses import dataclass, field
@ -885,6 +884,94 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
return clusters
# ---------------------------------------------------------------------------
# Top-level workers (must be importable for ProcessPoolExecutor pickling)
# ---------------------------------------------------------------------------
def _prescan_worker(
path_str: str,
delimiter: str,
text_encoding: str,
quotechar: str,
) -> Tuple[
str,
Optional[int],
Optional[Dict[str, Tuple[str, Optional[str]]]],
Optional[str],
]:
"""Top-level prescan worker for ProcessPoolExecutor.
Reads only metadata (pyreadstat ``metadataonly=True``) for one file and
returns ``(path_str, number_rows, per_column_meta, error_message)``.
Kept at module level so ``ProcessPoolExecutor`` can pickle it; the
closure version we used to call from a ``ThreadPoolExecutor`` shared
the parent's GIL and serialised the per-file Python work, which is
why pre-scan felt slow even though the actual disk reads were fast.
"""
try:
meta = read_sas_metadata(
Path(path_str),
delimiter=delimiter,
text_encoding=text_encoding,
quotechar=quotechar,
)
n = getattr(meta, "number_rows", None)
col_meta = extract_union_metadata(meta)
return (
path_str,
int(n) if n is not None else None,
col_meta,
None,
)
except Exception as e:
return (path_str, None, None, f"{type(e).__name__}: {e}")
def _partition_scan_worker(
path_str: str,
partition_by: List[str],
delimiter: str,
text_encoding: str,
quotechar: str,
) -> Tuple[str, Optional[dict], Optional[str]]:
"""Top-level partition-discovery worker for ProcessPoolExecutor.
Streams ``path_str`` with ``usecols=partition_by`` so pyreadstat only
decodes the partition columns themselves - on a wide sas7bdat that's
typically a 10x+ I/O reduction over reading every column. Returns
``(path_str, partial_partition_tree, error_message)``.
``columns`` is intentionally not passed: partition-value normalisation
needs the cluster-wide schema, which is out of process. The merger in
:func:`_discover_cluster_partitions` re-applies normalisation when it
folds the per-file trees together.
"""
try:
def _chunks() -> Any:
for chunk_df, _meta in iter_sas_chunks(
Path(path_str),
delimiter=delimiter,
text_encoding=text_encoding,
quotechar=quotechar,
usecols=list(partition_by),
):
yield chunk_df
tree = discover_partition_values_chunked(
_chunks(), list(partition_by),
)
return (path_str, tree, None)
except Exception as e:
import traceback as _traceback
return (
path_str,
None,
f"{type(e).__name__}: {e}\n{_traceback.format_exc()}",
)
# ---------------------------------------------------------------------------
# Per-cluster load
# ---------------------------------------------------------------------------
@ -929,27 +1016,83 @@ def _infer_cluster_schema(
def _discover_cluster_partitions(
cluster: ClusterSpec,
columns: Dict,
*,
workers: int = 1,
) -> dict:
"""Scan ALL files in ``cluster`` to discover partition values.
Returns a nested partition-value tree suitable for passing to
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
Each file is scanned chunk-by-chunk so the full dataset is never
materialized in memory.
Each file is read with ``usecols=cluster.partition_by`` so pyreadstat
only decodes the partition columns - on a wide sas7bdat that drops
the bytes-touched-per-file by an order of magnitude vs the old
full-row scan. With ``workers > 1`` the per-file scans run in a
``ProcessPoolExecutor`` and the partial trees are merged as they
complete (true parallelism, visible in ``htop`` as N python procs).
``include`` / ``exclude`` are intentionally not honoured here:
partition columns are validated against the inferred schema before
we ever get called, which already enforces that they survived any
explicit include/exclude filter. The old serial path applied the
filter for symmetry, but it was a no-op once usecols pinned us to
the partition set anyway.
"""
tkw = _build_text_kw(cluster)
merged: dict = {}
for path in cluster.files:
def _filtered_chunks(p=path):
for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw):
yield apply_column_filter(
chunk_df, cluster.include, cluster.exclude
)
file_tree = discover_partition_values_chunked(
_filtered_chunks(), cluster.partition_by, columns,
if not cluster.files:
return merged
n_workers = max(1, min(int(workers), len(cluster.files)))
if n_workers <= 1 or len(cluster.files) == 1:
for path in cluster.files:
def _filtered_chunks(p=path):
for chunk_df, _meta in iter_sas_chunks(
p, usecols=list(cluster.partition_by), **tkw,
):
yield chunk_df
file_tree = discover_partition_values_chunked(
_filtered_chunks(), cluster.partition_by, columns,
)
_merge_partition_trees(merged, file_tree)
return merged
with ProcessPoolExecutor(max_workers=n_workers) as ppool:
futures = {
ppool.submit(
_partition_scan_worker,
str(path),
list(cluster.partition_by),
tkw["delimiter"],
tkw["text_encoding"],
tkw["quotechar"],
): path
for path in cluster.files
}
bar = tqdm(
total=len(futures),
unit="file",
desc=" discovering partitions",
file=sys.stderr,
dynamic_ncols=True,
)
_merge_partition_trees(merged, file_tree)
try:
for fut in as_completed(futures):
path = futures[fut]
_path_str, file_tree, err = fut.result()
bar.update(1)
if err is not None:
raise RuntimeError(
f"partition discovery failed for {path}: {err}"
)
if file_tree:
_merge_partition_trees(merged, file_tree)
finally:
bar.close()
return merged
@ -1041,7 +1184,7 @@ def load_cluster(
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
cluster, first_columns,
cluster, first_columns, workers=workers,
)
total_parts = _count_partitions(partition_values)
print(
@ -1676,7 +1819,7 @@ def main(argv: Optional[List[str]] = None) -> int:
file=sys.stderr,
)
partition_values = _discover_cluster_partitions(
c, columns,
c, columns, workers=max(1, int(args.workers)),
)
total_parts = _count_partitions(partition_values)
print(
@ -1787,42 +1930,35 @@ def main(argv: Optional[List[str]] = None) -> int:
file=sys.stderr,
)
else:
prescan_workers = min(16, max(1, len(all_files)))
# Cap at min(workers, file_count, 32). Pyreadstat metadata reads are
# mostly C-side I/O + struct decoding, but the per-file Python work
# (extract_union_metadata, dict construction) was being serialised
# by the GIL in the old ThreadPool path - which is why the bar
# crawled even though disk was idle. ProcessPool gives us actual
# parallelism; you'll now see N python procs in htop.
prescan_workers = min(
32, max(1, max(workers, 16)), len(all_files),
)
print(
f"pre-scanning row counts + per-column metadata for "
f"{len(all_files)} file(s) across {prescan_workers} thread(s)...",
f"{len(all_files)} file(s) across {prescan_workers} "
f"process(es)...",
file=sys.stderr,
)
def _scan_one(
p: Path,
) -> Tuple[
Path,
Optional[int],
Optional[Dict[str, Tuple[str, Optional[str]]]],
Optional[str],
]:
try:
_prescan_tkw = dict(
delimiter=cfg.delimiter,
text_encoding=cfg.text_encoding,
quotechar=cfg.quotechar,
)
meta = read_sas_metadata(p, **_prescan_tkw)
n = getattr(meta, "number_rows", None)
col_meta = extract_union_metadata(meta)
return (
p,
int(n) if n is not None else None,
col_meta,
None,
)
except Exception as e:
return (p, None, None, str(e))
unknown_total_files: List[str] = []
running_total = 0
with ThreadPoolExecutor(max_workers=prescan_workers) as tpool:
with ProcessPoolExecutor(max_workers=prescan_workers) as ppool:
futures = {
ppool.submit(
_prescan_worker,
str(p),
cfg.delimiter,
cfg.text_encoding,
cfg.quotechar,
): p
for p in all_files
}
prescan_bar = tqdm(
total=len(all_files),
unit="file",
@ -1831,16 +1967,20 @@ def main(argv: Optional[List[str]] = None) -> int:
dynamic_ncols=True,
)
try:
for p, n, col_meta, err in tpool.map(_scan_one, all_files):
for fut in as_completed(futures):
path_obj = futures[fut]
path_str, n, col_meta, err = fut.result()
prescan_bar.update(1)
if err is not None:
unknown_total_files.append(f"{p.name} ({err})")
unknown_total_files.append(
f"{path_obj.name} ({err})"
)
elif n is None:
unknown_total_files.append(p.name)
unknown_total_files.append(path_obj.name)
else:
running_total += n
if col_meta is not None:
file_meta_by_path[str(p)] = col_meta
file_meta_by_path[path_str] = col_meta
finally:
prescan_bar.close()

View File

@ -315,6 +315,83 @@ The chunk size can be overridden at runtime via the
changes. Explicit ``chunksize=`` kwargs still win over both."""
NULL_STRING_SENTINELS: frozenset = frozenset({
"null",
"na",
"n/a",
"#n/a",
".",
"none",
"nan",
})
"""Lowercased string literals treated as SQL ``NULL`` across inference,
nullability detection, and COPY preparation. Seen in the wild when a
source system exports missing values as the literal text ``"null"``
(yes, really; some SAS CHAR columns hold it verbatim) or uses the
SAS/Stata ``.`` missing sentinel or spreadsheet-style ``NA`` / ``N/A``.
Kept narrow on purpose:
* ``"null"``, ``"none"``, ``"nan"`` the common spelled-out missings.
* ``"na"``, ``"n/a"``, ``"#n/a"`` spreadsheet / R conventions.
* ``"."`` SAS / Stata missing sentinel as CHAR export.
Matching is case-insensitive and ignores leading / trailing whitespace.
Extend this set in a calling module (``import load_sas;
load_sas.NULL_STRING_SENTINELS = frozenset({...})``) if your source
ships additional sentinels. Don't add ambiguous tokens (``"0"``,
``"unknown"``) - those are legitimate data in plenty of schemas."""
def _is_null_string(value: Any) -> bool:
"""True if ``value`` is a string whose lowercased/stripped form is
in :data:`NULL_STRING_SENTINELS`. Safe to call on any Python value;
non-strings return False so the helper can be dropped into the same
row-walks that also see floats / dates / None."""
if not isinstance(value, str):
return False
s = value.strip()
if not s:
return False
return s.lower() in NULL_STRING_SENTINELS
def _is_char_missing(value: Any) -> bool:
"""True if ``value`` should be treated as missing for a CHAR/TEXT
column. Unifies the three-way check (None / NaN / empty-or-sentinel
string) that used to live inline in several helpers so extending
the sentinel set in one place propagates everywhere."""
if value is None:
return True
if isinstance(value, float) and pd.isna(value):
return True
if isinstance(value, str):
s = value.strip()
if not s:
return True
if s.lower() in NULL_STRING_SENTINELS:
return True
return False
def _null_sentinel_mask(series: pd.Series) -> pd.Series:
"""Return a copy of ``series`` with empty strings and any value in
:data:`NULL_STRING_SENTINELS` replaced by ``None``.
Previously the coercion paths (numeric / datetime / TEXT) only
rewrote the empty string. That meant the literal text ``"null"``
sailed through ``pd.to_numeric(errors="coerce")`` as ``NaN`` (fine
for numerics; by accident) but ``pd.to_datetime(errors="coerce")``
handed it to ``dateutil`` which happily parsed it as... today's
date (dateutil treats bare words as "use current date for missing
fields"). Routing through this helper fixes both problems in one
pass. Non-string values are left alone so already-parsed
Timestamps / dates / numbers pass through untouched.
"""
if not pd.api.types.is_object_dtype(series):
return series
return series.map(lambda v: None if _is_char_missing(v) else v)
VALID_IF_EXISTS = ("fail", "replace", "append")
VALID_FILE_TYPES = ("sas", "text")
@ -811,6 +888,7 @@ def iter_text_chunks(
encoding: str = "utf-8",
quotechar: str = '"',
chunksize: Optional[int] = None,
usecols: Optional[List[str]] = None,
):
"""Yield ``(df_chunk, meta)`` tuples for streaming text file loads.
@ -818,6 +896,10 @@ def iter_text_chunks(
iteration. The metadata object is rebuilt for each chunk with the
chunk's column names and ``number_rows`` set to the total file rows
(computed once up front).
When ``usecols`` is provided, only those columns are parsed - useful
for cheap partition-value discovery scans where the rest of the row
would be wasted I/O.
"""
path = Path(path)
if chunksize is None:
@ -832,8 +914,7 @@ def iter_text_chunks(
total = _count_text_lines(path, encoding)
reader = pd.read_csv(
path,
read_csv_kwargs: Dict[str, Any] = dict(
delimiter=delimiter,
encoding=encoding,
quotechar=quotechar,
@ -842,6 +923,10 @@ def iter_text_chunks(
keep_default_na=True,
na_values=[""],
)
if usecols is not None:
read_csv_kwargs["usecols"] = list(usecols)
reader = pd.read_csv(path, **read_csv_kwargs)
for chunk_df in reader:
meta = _build_text_metadata(list(chunk_df.columns), number_rows=total)
yield chunk_df, meta
@ -941,6 +1026,7 @@ def iter_sas_chunks(
delimiter: str = ",",
text_encoding: str = "utf-8",
quotechar: str = '"',
usecols: Optional[List[str]] = None,
):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
@ -952,6 +1038,13 @@ def iter_sas_chunks(
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
always wins.
When ``usecols`` is provided, pyreadstat only decodes the listed
columns. For wide sas7bdat files this is dramatically cheaper than
a full read - the C decoder skips unwanted columns instead of
materializing them. Used by partition-value discovery to avoid
re-reading every byte of every file just to extract a couple of
partition keys.
For text files, delegates to :func:`iter_text_chunks`.
"""
if _is_text_file(path):
@ -961,6 +1054,7 @@ def iter_sas_chunks(
encoding=text_encoding,
quotechar=quotechar,
chunksize=chunksize,
usecols=usecols,
)
return
if chunksize is None:
@ -973,6 +1067,9 @@ def iter_sas_chunks(
else:
chunksize = DEFAULT_CHUNK_ROWS
reader, kwargs = _sas_reader(path)
if usecols is not None:
kwargs = dict(kwargs)
kwargs["usecols"] = list(usecols)
yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs
)
@ -1161,12 +1258,12 @@ def union_column_types(
def _all_null(series: pd.Series) -> bool:
if pd.api.types.is_object_dtype(series):
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
return bool(series.map(_is_char_missing).all())
return bool(series.isna().all())
def _char_missing_mask(series: pd.Series) -> pd.Series:
return series.map(lambda v: v is None or (isinstance(v, float) and pd.isna(v)) or (isinstance(v, str) and v == ""))
return series.map(_is_char_missing)
def _is_nullable(series: pd.Series) -> bool:
@ -1241,20 +1338,197 @@ def _try_float_coerce(values: List[str]) -> bool:
return True
# Locale-independent month lookup so ``DD-MON-YY`` / ``DDMONYYYY`` style
# strings (Oracle's default ``DD-MON-YY`` export, SAS ``DATE7.`` /
# ``DATE9.`` rendered to text, spreadsheets spitting out ``23-Mar-2020``)
# parse correctly regardless of the host's ``LC_TIME``. ``strptime("%b")``
# is locale-dependent and silently fails on non-English systems; this
# dict sidesteps that entirely.
_MONTH_LOOKUP: Dict[str, int] = {
"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
"JUL": 7, "AUG": 8, "SEP": 9, "SEPT": 9, "OCT": 10, "NOV": 11, "DEC": 12,
"JANUARY": 1, "FEBRUARY": 2, "MARCH": 3, "APRIL": 4, "JUNE": 6,
"JULY": 7, "AUGUST": 8, "SEPTEMBER": 9, "OCTOBER": 10,
"NOVEMBER": 11, "DECEMBER": 12,
}
# ``DD[sep]MON[sep]YY`` with an optional ``HH:MM[:SS[.ffff]] [AM|PM]``
# suffix. ``sep`` can be ``-``, ``/``, space, or empty so the same
# regex covers ``23-MAR-20``, ``23-MAR-2020``, ``23MAR2020`` (SAS
# ``DATE9.``), ``23 Mar 2020`` (Excel), and ``23-MAR-20 14:30:00``
# (Oracle ``TO_CHAR`` default with timestamp). Time portion is lenient
# on separator (``:`` or ``.``) since Oracle's default timestamp
# rendering uses dots (``02.30.45.123456``) while most others use
# colons.
_DDMONYY_RE = re.compile(
r"""
^\s*
(?P<day>\d{1,2})
[-/\s]?
(?P<month>[A-Za-z]{3,9})
[-/\s]?
(?P<year>\d{2}|\d{4})
(?:
[\sT:]+
(?P<hour>\d{1,2}) [:.] (?P<minute>\d{2})
(?:
[:.] (?P<second>\d{2})
(?: \. (?P<micro>\d+) )?
)?
\s*
(?P<ampm>[AaPp][Mm])?
)?
\s*$
""",
re.VERBOSE,
)
# Strptime fallbacks for all-numeric shapes the regex above can't
# disambiguate. Order matters: unambiguous 4-digit-year layouts first,
# then US-style ``mm/dd`` before EU-style ``dd/mm`` (the former is
# dominant in the kinds of exports this loader sees). Columns whose
# true format is ``DD/MM/YY`` should pin the Postgres type via
# ``column_types: {col: TEXT}`` and parse themselves downstream.
_EXTRA_DATE_FORMATS: Tuple[str, ...] = (
"%Y/%m/%d",
"%Y%m%d",
"%m/%d/%Y",
"%m/%d/%y",
"%m-%d-%Y",
"%m-%d-%y",
"%d/%m/%Y",
"%d/%m/%y",
"%d-%m-%Y",
"%d-%m-%y",
)
_EXTRA_DATETIME_FORMATS: Tuple[str, ...] = (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M",
"%m/%d/%y %H:%M:%S",
"%m/%d/%y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d/%m/%y %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
)
def _parse_flexible_date(value: Any) -> Optional[dt.date]:
"""Parse ``value`` to ``datetime.date`` using ISO first, then the
``DD-MON-YY`` family, then the numeric fallbacks in
:data:`_EXTRA_DATE_FORMATS`. Returns ``None`` if nothing matches.
Non-string / empty / non-finite inputs return ``None`` rather than
raising so callers can use this as a drop-in replacement for the old
``dt.date.fromisoformat`` + ``try``/``except`` pattern.
"""
if value is None:
return None
if not isinstance(value, str):
return None
s = value.strip()
if not s:
return None
try:
return dt.date.fromisoformat(s)
except (ValueError, TypeError):
pass
m = _DDMONYY_RE.match(s)
# Reject inputs that carry a time component so ``_try_date_coerce``
# doesn't silently swallow ``TIMESTAMP`` columns (``23-MAR-20 14:30:00``)
# and misclassify them as ``DATE``.
if m and m.group("hour") is None:
month = _MONTH_LOOKUP.get(m.group("month").upper())
if month is not None:
try:
day = int(m.group("day"))
year = int(m.group("year"))
if len(m.group("year")) == 2:
# Pivot year = 69 matches SAS / Oracle / Excel
# conventions: ``00..68`` -> 2000s, ``69..99`` -> 1900s.
year = 2000 + year if year < 69 else 1900 + year
return dt.date(year, month, day)
except ValueError:
return None
for fmt in _EXTRA_DATE_FORMATS:
try:
return dt.datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def _parse_flexible_datetime(value: Any) -> Optional[dt.datetime]:
"""Parse ``value`` to ``datetime.datetime``. Same format coverage as
:func:`_parse_flexible_date` plus explicit datetime shapes; a
date-only input is promoted to midnight so callers can treat a
column that mixes ``23-MAR-20`` and ``23-MAR-20 14:30:00`` as
``TIMESTAMP`` end-to-end.
"""
if value is None:
return None
if not isinstance(value, str):
return None
s = value.strip()
if not s:
return None
try:
return dt.datetime.fromisoformat(s)
except (ValueError, TypeError):
pass
m = _DDMONYY_RE.match(s)
if m:
month = _MONTH_LOOKUP.get(m.group("month").upper())
if month is not None:
try:
day = int(m.group("day"))
year = int(m.group("year"))
if len(m.group("year")) == 2:
year = 2000 + year if year < 69 else 1900 + year
hour = int(m.group("hour")) if m.group("hour") else 0
minute = int(m.group("minute")) if m.group("minute") else 0
second = int(m.group("second")) if m.group("second") else 0
micro = 0
if m.group("micro"):
# ``%f`` expects 1-6 digits; pad / truncate to match.
micro_s = m.group("micro")[:6].ljust(6, "0")
micro = int(micro_s)
ampm = m.group("ampm")
if ampm:
ap = ampm.upper()
if ap == "PM" and hour < 12:
hour += 12
elif ap == "AM" and hour == 12:
hour = 0
return dt.datetime(year, month, day, hour, minute, second, micro)
except ValueError:
return None
for fmt in _EXTRA_DATETIME_FORMATS:
try:
return dt.datetime.strptime(s, fmt)
except ValueError:
continue
# Final fallback: accept a date-only string and promote to midnight.
d = _parse_flexible_date(s)
if d is not None:
return dt.datetime(d.year, d.month, d.day)
return None
def _try_date_coerce(values: List[str]) -> bool:
for v in values:
try:
dt.date.fromisoformat(v)
except (ValueError, TypeError):
if _parse_flexible_date(v) is None:
return False
return True
def _try_datetime_coerce(values: List[str]) -> bool:
for v in values:
try:
dt.datetime.fromisoformat(v)
except (ValueError, TypeError):
if _parse_flexible_datetime(v) is None:
return False
return True
@ -1824,6 +2098,12 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
except (TypeError, ValueError):
pass
# Sentinel strings (``"null"``, ``"NA"``, ``"."``, ...) collapse to
# Python None up front so every type branch below can skip its own
# empty-string dance.
if _is_null_string(value):
return None
pg_upper = pg_type.upper()
if pg_upper in ("INTEGER", "BIGINT", "SMALLINT", "INT", "INT4", "INT8", "INT2"):
@ -1856,10 +2136,7 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
if isinstance(value, str):
if value.strip() == "":
return None
try:
return dt.date.fromisoformat(value.strip())
except (ValueError, TypeError):
return None
return _parse_flexible_date(value)
return None
if pg_upper in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE",
@ -1873,10 +2150,7 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
if isinstance(value, str):
if value.strip() == "":
return None
try:
return dt.datetime.fromisoformat(value.strip())
except (ValueError, TypeError):
return None
return _parse_flexible_datetime(value)
return None
if pg_upper in ("TIME", "TIME WITHOUT TIME ZONE",
@ -2379,16 +2653,59 @@ def _safe_object_to_datetime(
"""Object-dtype to datetime. Shares the safety net (errstate +
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
actually numeric-flavored (e.g. SAS wrote numbers into an object
column), route to the numeric path; otherwise parse with ``to_datetime``
on the object itself.
column), route to the numeric path; otherwise try our explicit
``DD-MON-YY`` / strptime format set before falling back to the
generic ``pd.to_datetime`` dateutil parser.
The explicit-format pre-pass exists because:
* ``pd.to_datetime`` on unformatted object columns emits a
``UserWarning`` per chunk and parses row-by-row via ``dateutil``
-- 10-100× slower than a single vectorized strptime.
* ``dateutil`` *will* parse ``23-MAR-20`` but its 2-digit-year pivot
differs from SAS/Oracle convention in corner cases; applying our
own parser keeps behavior predictable.
"""
coerced = series.replace({"": None})
coerced = _null_sentinel_mask(series)
numeric = pd.to_numeric(coerced, errors="coerce")
all_numeric = numeric.notna().sum() == coerced.notna().sum()
if all_numeric and coerced.notna().any():
return _safe_numeric_to_datetime(
numeric, unit="s", column_name=column_name, target_type=target_type,
)
non_null_count = int(coerced.notna().sum())
if non_null_count:
# First pass: our regex-based ``DD-MON-YY`` parser. Cheap,
# locale-independent, covers the cases ``pd.to_datetime`` warns
# about. Always parse via the datetime-aware variant so a DATE
# target whose chunk happens to carry time components
# (``23-MAR-20 14:30:00``) still parses without warnings; the
# caller's ``.dt.date`` cast truncates the time, matching the
# existing datetime64-input branch.
parsed_py = coerced.map(
lambda v: _parse_flexible_datetime(v) if v is not None else None
)
parsed_ts = pd.to_datetime(parsed_py, errors="coerce")
if int(parsed_ts.notna().sum()) == non_null_count:
return parsed_ts
# Second pass: vectorized ``pd.to_datetime`` with each explicit
# format. One ``pd.to_datetime(format=fmt)`` call is O(n) in C;
# trying a handful of them still beats row-by-row dateutil on
# large chunks. Accept the first format that covers every
# non-null cell.
for fmt in _EXTRA_DATETIME_FORMATS + _EXTRA_DATE_FORMATS:
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
candidate = pd.to_datetime(coerced, format=fmt, errors="coerce")
except (ValueError, TypeError):
continue
if int(candidate.notna().sum()) == non_null_count:
return candidate
# Final fallback: ``dateutil`` via ``pd.to_datetime``. Handles
# shapes our explicit list missed (rare edge cases, mixed formats
# within one column). Same safety net as the numeric path.
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(coerced, errors="coerce")
@ -2423,13 +2740,13 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
if pg in ("INTEGER", "BIGINT", "SMALLINT"):
if pd.api.types.is_object_dtype(series):
series = pd.to_numeric(
series.replace({"": None}), errors="coerce"
_null_sentinel_mask(series), errors="coerce"
)
out[name] = series.astype("Int64")
elif pg in ("DOUBLE PRECISION", "REAL", "NUMERIC"):
if pd.api.types.is_object_dtype(series):
series = pd.to_numeric(
series.replace({"": None}), errors="coerce"
_null_sentinel_mask(series), errors="coerce"
)
out[name] = series.astype("float64")
elif pg == "DATE":
@ -2488,6 +2805,12 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
# in the COPY statement turns the blanks back into SQL NULL.
# astype(str) stringifies NaN/None to the literal "nan"/"None",
# so we mask those after the fact rather than branching per cell.
# Object columns also get the sentinel sweep
# (:data:`NULL_STRING_SENTINELS`) so a literal ``"null"`` /
# ``"NA"`` / ``"."`` value lands as SQL NULL on the way in,
# matching what the numeric / date branches above do.
if pd.api.types.is_object_dtype(series):
series = _null_sentinel_mask(series)
na_mask = series.isna()
if pd.api.types.is_numeric_dtype(series):
# Hit when a column was auto-unioned to TEXT because at

View File

@ -35,6 +35,10 @@ import sys
from dataclasses import dataclass, field
from typing import List, Set, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
# ---------------------------------------------------------------------------
# Dependency check
# ---------------------------------------------------------------------------
@ -75,10 +79,10 @@ FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line."""
S3_BUCKET: str = "my-bucket"
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
AWS_PROFILE: str = "default"
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
"""AWS CLI profile name used for authentication."""
# Text-file reading defaults (used when downloading / previewing text files)

View File

@ -21,6 +21,10 @@ import argparse
import os
import sys
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3
import pandas as pd
import pyreadstat
@ -44,7 +48,7 @@ SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
# Configuration — edit these before running (or use CLI arguments)
# ---------------------------------------------------------------------------
S3_BUCKET: str = "my-bucket"
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name."""
S3_KEY: str = "path/to/file.sas7bdat"
@ -53,7 +57,7 @@ S3_KEY: str = "path/to/file.sas7bdat"
LOCAL_FOLDER: str = "./downloads"
"""Local directory to download the file into."""
AWS_PROFILE: str = "default"
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
"""AWS CLI profile name used for authentication."""

View File

@ -92,6 +92,7 @@ Exit codes:
from __future__ import annotations
import argparse
import os
import re
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
@ -99,6 +100,10 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3
import yaml
@ -226,15 +231,20 @@ def load_download_config(path: Path) -> DownloadConfig:
f"Config at {path} must be a YAML mapping at the top level."
)
missing = [
k for k in ("bucket", "prefix", "local_folder") if k not in raw
]
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
# missing when neither the YAML key nor the env var is present.
required_always = ("prefix", "local_folder")
missing = [k for k in required_always if k not in raw]
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
missing.insert(0, "bucket")
if missing:
raise ValueError(
f"Config {path} missing required keys: {', '.join(missing)}"
)
bucket = str(raw["bucket"]).strip()
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
if not bucket:
bucket = os.environ.get("S3_BUCKET", "")
if not bucket:
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
@ -256,6 +266,8 @@ def load_download_config(path: Path) -> DownloadConfig:
aws_profile = raw.get("aws_profile")
if aws_profile is not None:
aws_profile = str(aws_profile).strip() or None
if aws_profile is None:
aws_profile = os.environ.get("AWS_PROFILE") or None
auto_detect = bool(raw.get("auto_detect", True))
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")