Compare commits

..

No commits in common. "main" and "directory_explorer" have entirely different histories.

7 changed files with 79 additions and 566 deletions

3
.gitignore vendored
View File

@ -1,7 +1,6 @@
/.venv /.venv
/samples /samples
.env /.env
!.env.example
__pycache__/ __pycache__/
venv/ venv/
*/__pycache__/ */__pycache__/

View File

@ -3,6 +3,3 @@ PGPORT=5432
PGUSER= PGUSER=
PGPASSWORD= PGPASSWORD=
PGDATABASE= PGDATABASE=
S3_BUCKET=my-bucket
AWS_PROFILE=default

View File

@ -157,6 +157,7 @@ import threading
from concurrent.futures import ( from concurrent.futures import (
CancelledError, CancelledError,
ProcessPoolExecutor, ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed, as_completed,
) )
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -884,94 +885,6 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
return clusters return clusters
# ---------------------------------------------------------------------------
# Top-level workers (must be importable for ProcessPoolExecutor pickling)
# ---------------------------------------------------------------------------
def _prescan_worker(
path_str: str,
delimiter: str,
text_encoding: str,
quotechar: str,
) -> Tuple[
str,
Optional[int],
Optional[Dict[str, Tuple[str, Optional[str]]]],
Optional[str],
]:
"""Top-level prescan worker for ProcessPoolExecutor.
Reads only metadata (pyreadstat ``metadataonly=True``) for one file and
returns ``(path_str, number_rows, per_column_meta, error_message)``.
Kept at module level so ``ProcessPoolExecutor`` can pickle it; the
closure version we used to call from a ``ThreadPoolExecutor`` shared
the parent's GIL and serialised the per-file Python work, which is
why pre-scan felt slow even though the actual disk reads were fast.
"""
try:
meta = read_sas_metadata(
Path(path_str),
delimiter=delimiter,
text_encoding=text_encoding,
quotechar=quotechar,
)
n = getattr(meta, "number_rows", None)
col_meta = extract_union_metadata(meta)
return (
path_str,
int(n) if n is not None else None,
col_meta,
None,
)
except Exception as e:
return (path_str, None, None, f"{type(e).__name__}: {e}")
def _partition_scan_worker(
path_str: str,
partition_by: List[str],
delimiter: str,
text_encoding: str,
quotechar: str,
) -> Tuple[str, Optional[dict], Optional[str]]:
"""Top-level partition-discovery worker for ProcessPoolExecutor.
Streams ``path_str`` with ``usecols=partition_by`` so pyreadstat only
decodes the partition columns themselves - on a wide sas7bdat that's
typically a 10x+ I/O reduction over reading every column. Returns
``(path_str, partial_partition_tree, error_message)``.
``columns`` is intentionally not passed: partition-value normalisation
needs the cluster-wide schema, which is out of process. The merger in
:func:`_discover_cluster_partitions` re-applies normalisation when it
folds the per-file trees together.
"""
try:
def _chunks() -> Any:
for chunk_df, _meta in iter_sas_chunks(
Path(path_str),
delimiter=delimiter,
text_encoding=text_encoding,
quotechar=quotechar,
usecols=list(partition_by),
):
yield chunk_df
tree = discover_partition_values_chunked(
_chunks(), list(partition_by),
)
return (path_str, tree, None)
except Exception as e:
import traceback as _traceback
return (
path_str,
None,
f"{type(e).__name__}: {e}\n{_traceback.format_exc()}",
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Per-cluster load # Per-cluster load
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -1016,43 +929,22 @@ def _infer_cluster_schema(
def _discover_cluster_partitions( def _discover_cluster_partitions(
cluster: ClusterSpec, cluster: ClusterSpec,
columns: Dict, columns: Dict,
*,
workers: int = 1,
) -> dict: ) -> dict:
"""Scan ALL files in ``cluster`` to discover partition values. """Scan ALL files in ``cluster`` to discover partition values.
Returns a nested partition-value tree suitable for passing to Returns a nested partition-value tree suitable for passing to
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`. :func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
Each file is scanned chunk-by-chunk so the full dataset is never
Each file is read with ``usecols=cluster.partition_by`` so pyreadstat materialized in memory.
only decodes the partition columns - on a wide sas7bdat that drops
the bytes-touched-per-file by an order of magnitude vs the old
full-row scan. With ``workers > 1`` the per-file scans run in a
``ProcessPoolExecutor`` and the partial trees are merged as they
complete (true parallelism, visible in ``htop`` as N python procs).
``include`` / ``exclude`` are intentionally not honoured here:
partition columns are validated against the inferred schema before
we ever get called, which already enforces that they survived any
explicit include/exclude filter. The old serial path applied the
filter for symmetry, but it was a no-op once usecols pinned us to
the partition set anyway.
""" """
tkw = _build_text_kw(cluster) tkw = _build_text_kw(cluster)
merged: dict = {} merged: dict = {}
if not cluster.files:
return merged
n_workers = max(1, min(int(workers), len(cluster.files)))
if n_workers <= 1 or len(cluster.files) == 1:
for path in cluster.files: for path in cluster.files:
def _filtered_chunks(p=path): def _filtered_chunks(p=path):
for chunk_df, _meta in iter_sas_chunks( for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw):
p, usecols=list(cluster.partition_by), **tkw, yield apply_column_filter(
): chunk_df, cluster.include, cluster.exclude
yield chunk_df )
file_tree = discover_partition_values_chunked( file_tree = discover_partition_values_chunked(
_filtered_chunks(), cluster.partition_by, columns, _filtered_chunks(), cluster.partition_by, columns,
@ -1060,41 +952,6 @@ def _discover_cluster_partitions(
_merge_partition_trees(merged, file_tree) _merge_partition_trees(merged, file_tree)
return merged return merged
with ProcessPoolExecutor(max_workers=n_workers) as ppool:
futures = {
ppool.submit(
_partition_scan_worker,
str(path),
list(cluster.partition_by),
tkw["delimiter"],
tkw["text_encoding"],
tkw["quotechar"],
): path
for path in cluster.files
}
bar = tqdm(
total=len(futures),
unit="file",
desc=" discovering partitions",
file=sys.stderr,
dynamic_ncols=True,
)
try:
for fut in as_completed(futures):
path = futures[fut]
_path_str, file_tree, err = fut.result()
bar.update(1)
if err is not None:
raise RuntimeError(
f"partition discovery failed for {path}: {err}"
)
if file_tree:
_merge_partition_trees(merged, file_tree)
finally:
bar.close()
return merged
def load_cluster( def load_cluster(
conn, conn,
@ -1184,7 +1041,7 @@ def load_cluster(
file=sys.stderr, file=sys.stderr,
) )
partition_values = _discover_cluster_partitions( partition_values = _discover_cluster_partitions(
cluster, first_columns, workers=workers, cluster, first_columns,
) )
total_parts = _count_partitions(partition_values) total_parts = _count_partitions(partition_values)
print( print(
@ -1819,7 +1676,7 @@ def main(argv: Optional[List[str]] = None) -> int:
file=sys.stderr, file=sys.stderr,
) )
partition_values = _discover_cluster_partitions( partition_values = _discover_cluster_partitions(
c, columns, workers=max(1, int(args.workers)), c, columns,
) )
total_parts = _count_partitions(partition_values) total_parts = _count_partitions(partition_values)
print( print(
@ -1930,35 +1787,42 @@ def main(argv: Optional[List[str]] = None) -> int:
file=sys.stderr, file=sys.stderr,
) )
else: else:
# Cap at min(workers, file_count, 32). Pyreadstat metadata reads are prescan_workers = min(16, max(1, len(all_files)))
# mostly C-side I/O + struct decoding, but the per-file Python work
# (extract_union_metadata, dict construction) was being serialised
# by the GIL in the old ThreadPool path - which is why the bar
# crawled even though disk was idle. ProcessPool gives us actual
# parallelism; you'll now see N python procs in htop.
prescan_workers = min(
32, max(1, max(workers, 16)), len(all_files),
)
print( print(
f"pre-scanning row counts + per-column metadata for " f"pre-scanning row counts + per-column metadata for "
f"{len(all_files)} file(s) across {prescan_workers} " f"{len(all_files)} file(s) across {prescan_workers} thread(s)...",
f"process(es)...",
file=sys.stderr, file=sys.stderr,
) )
def _scan_one(
p: Path,
) -> Tuple[
Path,
Optional[int],
Optional[Dict[str, Tuple[str, Optional[str]]]],
Optional[str],
]:
try:
_prescan_tkw = dict(
delimiter=cfg.delimiter,
text_encoding=cfg.text_encoding,
quotechar=cfg.quotechar,
)
meta = read_sas_metadata(p, **_prescan_tkw)
n = getattr(meta, "number_rows", None)
col_meta = extract_union_metadata(meta)
return (
p,
int(n) if n is not None else None,
col_meta,
None,
)
except Exception as e:
return (p, None, None, str(e))
unknown_total_files: List[str] = [] unknown_total_files: List[str] = []
running_total = 0 running_total = 0
with ProcessPoolExecutor(max_workers=prescan_workers) as ppool: with ThreadPoolExecutor(max_workers=prescan_workers) as tpool:
futures = {
ppool.submit(
_prescan_worker,
str(p),
cfg.delimiter,
cfg.text_encoding,
cfg.quotechar,
): p
for p in all_files
}
prescan_bar = tqdm( prescan_bar = tqdm(
total=len(all_files), total=len(all_files),
unit="file", unit="file",
@ -1967,20 +1831,16 @@ def main(argv: Optional[List[str]] = None) -> int:
dynamic_ncols=True, dynamic_ncols=True,
) )
try: try:
for fut in as_completed(futures): for p, n, col_meta, err in tpool.map(_scan_one, all_files):
path_obj = futures[fut]
path_str, n, col_meta, err = fut.result()
prescan_bar.update(1) prescan_bar.update(1)
if err is not None: if err is not None:
unknown_total_files.append( unknown_total_files.append(f"{p.name} ({err})")
f"{path_obj.name} ({err})"
)
elif n is None: elif n is None:
unknown_total_files.append(path_obj.name) unknown_total_files.append(p.name)
else: else:
running_total += n running_total += n
if col_meta is not None: if col_meta is not None:
file_meta_by_path[path_str] = col_meta file_meta_by_path[str(p)] = col_meta
finally: finally:
prescan_bar.close() prescan_bar.close()

View File

@ -315,83 +315,6 @@ The chunk size can be overridden at runtime via the
changes. Explicit ``chunksize=`` kwargs still win over both.""" changes. Explicit ``chunksize=`` kwargs still win over both."""
NULL_STRING_SENTINELS: frozenset = frozenset({
"null",
"na",
"n/a",
"#n/a",
".",
"none",
"nan",
})
"""Lowercased string literals treated as SQL ``NULL`` across inference,
nullability detection, and COPY preparation. Seen in the wild when a
source system exports missing values as the literal text ``"null"``
(yes, really; some SAS CHAR columns hold it verbatim) or uses the
SAS/Stata ``.`` missing sentinel or spreadsheet-style ``NA`` / ``N/A``.
Kept narrow on purpose:
* ``"null"``, ``"none"``, ``"nan"`` the common spelled-out missings.
* ``"na"``, ``"n/a"``, ``"#n/a"`` spreadsheet / R conventions.
* ``"."`` SAS / Stata missing sentinel as CHAR export.
Matching is case-insensitive and ignores leading / trailing whitespace.
Extend this set in a calling module (``import load_sas;
load_sas.NULL_STRING_SENTINELS = frozenset({...})``) if your source
ships additional sentinels. Don't add ambiguous tokens (``"0"``,
``"unknown"``) - those are legitimate data in plenty of schemas."""
def _is_null_string(value: Any) -> bool:
"""True if ``value`` is a string whose lowercased/stripped form is
in :data:`NULL_STRING_SENTINELS`. Safe to call on any Python value;
non-strings return False so the helper can be dropped into the same
row-walks that also see floats / dates / None."""
if not isinstance(value, str):
return False
s = value.strip()
if not s:
return False
return s.lower() in NULL_STRING_SENTINELS
def _is_char_missing(value: Any) -> bool:
"""True if ``value`` should be treated as missing for a CHAR/TEXT
column. Unifies the three-way check (None / NaN / empty-or-sentinel
string) that used to live inline in several helpers so extending
the sentinel set in one place propagates everywhere."""
if value is None:
return True
if isinstance(value, float) and pd.isna(value):
return True
if isinstance(value, str):
s = value.strip()
if not s:
return True
if s.lower() in NULL_STRING_SENTINELS:
return True
return False
def _null_sentinel_mask(series: pd.Series) -> pd.Series:
"""Return a copy of ``series`` with empty strings and any value in
:data:`NULL_STRING_SENTINELS` replaced by ``None``.
Previously the coercion paths (numeric / datetime / TEXT) only
rewrote the empty string. That meant the literal text ``"null"``
sailed through ``pd.to_numeric(errors="coerce")`` as ``NaN`` (fine
for numerics; by accident) but ``pd.to_datetime(errors="coerce")``
handed it to ``dateutil`` which happily parsed it as... today's
date (dateutil treats bare words as "use current date for missing
fields"). Routing through this helper fixes both problems in one
pass. Non-string values are left alone so already-parsed
Timestamps / dates / numbers pass through untouched.
"""
if not pd.api.types.is_object_dtype(series):
return series
return series.map(lambda v: None if _is_char_missing(v) else v)
VALID_IF_EXISTS = ("fail", "replace", "append") VALID_IF_EXISTS = ("fail", "replace", "append")
VALID_FILE_TYPES = ("sas", "text") VALID_FILE_TYPES = ("sas", "text")
@ -888,7 +811,6 @@ def iter_text_chunks(
encoding: str = "utf-8", encoding: str = "utf-8",
quotechar: str = '"', quotechar: str = '"',
chunksize: Optional[int] = None, chunksize: Optional[int] = None,
usecols: Optional[List[str]] = None,
): ):
"""Yield ``(df_chunk, meta)`` tuples for streaming text file loads. """Yield ``(df_chunk, meta)`` tuples for streaming text file loads.
@ -896,10 +818,6 @@ def iter_text_chunks(
iteration. The metadata object is rebuilt for each chunk with the iteration. The metadata object is rebuilt for each chunk with the
chunk's column names and ``number_rows`` set to the total file rows chunk's column names and ``number_rows`` set to the total file rows
(computed once up front). (computed once up front).
When ``usecols`` is provided, only those columns are parsed - useful
for cheap partition-value discovery scans where the rest of the row
would be wasted I/O.
""" """
path = Path(path) path = Path(path)
if chunksize is None: if chunksize is None:
@ -914,7 +832,8 @@ def iter_text_chunks(
total = _count_text_lines(path, encoding) total = _count_text_lines(path, encoding)
read_csv_kwargs: Dict[str, Any] = dict( reader = pd.read_csv(
path,
delimiter=delimiter, delimiter=delimiter,
encoding=encoding, encoding=encoding,
quotechar=quotechar, quotechar=quotechar,
@ -923,10 +842,6 @@ def iter_text_chunks(
keep_default_na=True, keep_default_na=True,
na_values=[""], na_values=[""],
) )
if usecols is not None:
read_csv_kwargs["usecols"] = list(usecols)
reader = pd.read_csv(path, **read_csv_kwargs)
for chunk_df in reader: for chunk_df in reader:
meta = _build_text_metadata(list(chunk_df.columns), number_rows=total) meta = _build_text_metadata(list(chunk_df.columns), number_rows=total)
yield chunk_df, meta yield chunk_df, meta
@ -1026,7 +941,6 @@ def iter_sas_chunks(
delimiter: str = ",", delimiter: str = ",",
text_encoding: str = "utf-8", text_encoding: str = "utf-8",
quotechar: str = '"', quotechar: str = '"',
usecols: Optional[List[str]] = None,
): ):
"""Yield ``(df_chunk, meta)`` tuples for streaming loads. """Yield ``(df_chunk, meta)`` tuples for streaming loads.
@ -1038,13 +952,6 @@ def iter_sas_chunks(
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
always wins. always wins.
When ``usecols`` is provided, pyreadstat only decodes the listed
columns. For wide sas7bdat files this is dramatically cheaper than
a full read - the C decoder skips unwanted columns instead of
materializing them. Used by partition-value discovery to avoid
re-reading every byte of every file just to extract a couple of
partition keys.
For text files, delegates to :func:`iter_text_chunks`. For text files, delegates to :func:`iter_text_chunks`.
""" """
if _is_text_file(path): if _is_text_file(path):
@ -1054,7 +961,6 @@ def iter_sas_chunks(
encoding=text_encoding, encoding=text_encoding,
quotechar=quotechar, quotechar=quotechar,
chunksize=chunksize, chunksize=chunksize,
usecols=usecols,
) )
return return
if chunksize is None: if chunksize is None:
@ -1067,9 +973,6 @@ def iter_sas_chunks(
else: else:
chunksize = DEFAULT_CHUNK_ROWS chunksize = DEFAULT_CHUNK_ROWS
reader, kwargs = _sas_reader(path) reader, kwargs = _sas_reader(path)
if usecols is not None:
kwargs = dict(kwargs)
kwargs["usecols"] = list(usecols)
yield from pyreadstat.read_file_in_chunks( yield from pyreadstat.read_file_in_chunks(
reader, str(Path(path)), chunksize=chunksize, **kwargs reader, str(Path(path)), chunksize=chunksize, **kwargs
) )
@ -1258,12 +1161,12 @@ def union_column_types(
def _all_null(series: pd.Series) -> bool: def _all_null(series: pd.Series) -> bool:
if pd.api.types.is_object_dtype(series): if pd.api.types.is_object_dtype(series):
return bool(series.map(_is_char_missing).all()) return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
return bool(series.isna().all()) return bool(series.isna().all())
def _char_missing_mask(series: pd.Series) -> pd.Series: def _char_missing_mask(series: pd.Series) -> pd.Series:
return series.map(_is_char_missing) return series.map(lambda v: v is None or (isinstance(v, float) and pd.isna(v)) or (isinstance(v, str) and v == ""))
def _is_nullable(series: pd.Series) -> bool: def _is_nullable(series: pd.Series) -> bool:
@ -1338,197 +1241,20 @@ def _try_float_coerce(values: List[str]) -> bool:
return True return True
# Locale-independent month lookup so ``DD-MON-YY`` / ``DDMONYYYY`` style
# strings (Oracle's default ``DD-MON-YY`` export, SAS ``DATE7.`` /
# ``DATE9.`` rendered to text, spreadsheets spitting out ``23-Mar-2020``)
# parse correctly regardless of the host's ``LC_TIME``. ``strptime("%b")``
# is locale-dependent and silently fails on non-English systems; this
# dict sidesteps that entirely.
_MONTH_LOOKUP: Dict[str, int] = {
"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
"JUL": 7, "AUG": 8, "SEP": 9, "SEPT": 9, "OCT": 10, "NOV": 11, "DEC": 12,
"JANUARY": 1, "FEBRUARY": 2, "MARCH": 3, "APRIL": 4, "JUNE": 6,
"JULY": 7, "AUGUST": 8, "SEPTEMBER": 9, "OCTOBER": 10,
"NOVEMBER": 11, "DECEMBER": 12,
}
# ``DD[sep]MON[sep]YY`` with an optional ``HH:MM[:SS[.ffff]] [AM|PM]``
# suffix. ``sep`` can be ``-``, ``/``, space, or empty so the same
# regex covers ``23-MAR-20``, ``23-MAR-2020``, ``23MAR2020`` (SAS
# ``DATE9.``), ``23 Mar 2020`` (Excel), and ``23-MAR-20 14:30:00``
# (Oracle ``TO_CHAR`` default with timestamp). Time portion is lenient
# on separator (``:`` or ``.``) since Oracle's default timestamp
# rendering uses dots (``02.30.45.123456``) while most others use
# colons.
_DDMONYY_RE = re.compile(
r"""
^\s*
(?P<day>\d{1,2})
[-/\s]?
(?P<month>[A-Za-z]{3,9})
[-/\s]?
(?P<year>\d{2}|\d{4})
(?:
[\sT:]+
(?P<hour>\d{1,2}) [:.] (?P<minute>\d{2})
(?:
[:.] (?P<second>\d{2})
(?: \. (?P<micro>\d+) )?
)?
\s*
(?P<ampm>[AaPp][Mm])?
)?
\s*$
""",
re.VERBOSE,
)
# Strptime fallbacks for all-numeric shapes the regex above can't
# disambiguate. Order matters: unambiguous 4-digit-year layouts first,
# then US-style ``mm/dd`` before EU-style ``dd/mm`` (the former is
# dominant in the kinds of exports this loader sees). Columns whose
# true format is ``DD/MM/YY`` should pin the Postgres type via
# ``column_types: {col: TEXT}`` and parse themselves downstream.
_EXTRA_DATE_FORMATS: Tuple[str, ...] = (
"%Y/%m/%d",
"%Y%m%d",
"%m/%d/%Y",
"%m/%d/%y",
"%m-%d-%Y",
"%m-%d-%y",
"%d/%m/%Y",
"%d/%m/%y",
"%d-%m-%Y",
"%d-%m-%y",
)
_EXTRA_DATETIME_FORMATS: Tuple[str, ...] = (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M",
"%m/%d/%y %H:%M:%S",
"%m/%d/%y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d/%m/%y %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
)
def _parse_flexible_date(value: Any) -> Optional[dt.date]:
"""Parse ``value`` to ``datetime.date`` using ISO first, then the
``DD-MON-YY`` family, then the numeric fallbacks in
:data:`_EXTRA_DATE_FORMATS`. Returns ``None`` if nothing matches.
Non-string / empty / non-finite inputs return ``None`` rather than
raising so callers can use this as a drop-in replacement for the old
``dt.date.fromisoformat`` + ``try``/``except`` pattern.
"""
if value is None:
return None
if not isinstance(value, str):
return None
s = value.strip()
if not s:
return None
try:
return dt.date.fromisoformat(s)
except (ValueError, TypeError):
pass
m = _DDMONYY_RE.match(s)
# Reject inputs that carry a time component so ``_try_date_coerce``
# doesn't silently swallow ``TIMESTAMP`` columns (``23-MAR-20 14:30:00``)
# and misclassify them as ``DATE``.
if m and m.group("hour") is None:
month = _MONTH_LOOKUP.get(m.group("month").upper())
if month is not None:
try:
day = int(m.group("day"))
year = int(m.group("year"))
if len(m.group("year")) == 2:
# Pivot year = 69 matches SAS / Oracle / Excel
# conventions: ``00..68`` -> 2000s, ``69..99`` -> 1900s.
year = 2000 + year if year < 69 else 1900 + year
return dt.date(year, month, day)
except ValueError:
return None
for fmt in _EXTRA_DATE_FORMATS:
try:
return dt.datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def _parse_flexible_datetime(value: Any) -> Optional[dt.datetime]:
"""Parse ``value`` to ``datetime.datetime``. Same format coverage as
:func:`_parse_flexible_date` plus explicit datetime shapes; a
date-only input is promoted to midnight so callers can treat a
column that mixes ``23-MAR-20`` and ``23-MAR-20 14:30:00`` as
``TIMESTAMP`` end-to-end.
"""
if value is None:
return None
if not isinstance(value, str):
return None
s = value.strip()
if not s:
return None
try:
return dt.datetime.fromisoformat(s)
except (ValueError, TypeError):
pass
m = _DDMONYY_RE.match(s)
if m:
month = _MONTH_LOOKUP.get(m.group("month").upper())
if month is not None:
try:
day = int(m.group("day"))
year = int(m.group("year"))
if len(m.group("year")) == 2:
year = 2000 + year if year < 69 else 1900 + year
hour = int(m.group("hour")) if m.group("hour") else 0
minute = int(m.group("minute")) if m.group("minute") else 0
second = int(m.group("second")) if m.group("second") else 0
micro = 0
if m.group("micro"):
# ``%f`` expects 1-6 digits; pad / truncate to match.
micro_s = m.group("micro")[:6].ljust(6, "0")
micro = int(micro_s)
ampm = m.group("ampm")
if ampm:
ap = ampm.upper()
if ap == "PM" and hour < 12:
hour += 12
elif ap == "AM" and hour == 12:
hour = 0
return dt.datetime(year, month, day, hour, minute, second, micro)
except ValueError:
return None
for fmt in _EXTRA_DATETIME_FORMATS:
try:
return dt.datetime.strptime(s, fmt)
except ValueError:
continue
# Final fallback: accept a date-only string and promote to midnight.
d = _parse_flexible_date(s)
if d is not None:
return dt.datetime(d.year, d.month, d.day)
return None
def _try_date_coerce(values: List[str]) -> bool: def _try_date_coerce(values: List[str]) -> bool:
for v in values: for v in values:
if _parse_flexible_date(v) is None: try:
dt.date.fromisoformat(v)
except (ValueError, TypeError):
return False return False
return True return True
def _try_datetime_coerce(values: List[str]) -> bool: def _try_datetime_coerce(values: List[str]) -> bool:
for v in values: for v in values:
if _parse_flexible_datetime(v) is None: try:
dt.datetime.fromisoformat(v)
except (ValueError, TypeError):
return False return False
return True return True
@ -2098,12 +1824,6 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
except (TypeError, ValueError): except (TypeError, ValueError):
pass pass
# Sentinel strings (``"null"``, ``"NA"``, ``"."``, ...) collapse to
# Python None up front so every type branch below can skip its own
# empty-string dance.
if _is_null_string(value):
return None
pg_upper = pg_type.upper() pg_upper = pg_type.upper()
if pg_upper in ("INTEGER", "BIGINT", "SMALLINT", "INT", "INT4", "INT8", "INT2"): if pg_upper in ("INTEGER", "BIGINT", "SMALLINT", "INT", "INT4", "INT8", "INT2"):
@ -2136,7 +1856,10 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
if isinstance(value, str): if isinstance(value, str):
if value.strip() == "": if value.strip() == "":
return None return None
return _parse_flexible_date(value) try:
return dt.date.fromisoformat(value.strip())
except (ValueError, TypeError):
return None
return None return None
if pg_upper in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE", if pg_upper in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE",
@ -2150,7 +1873,10 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
if isinstance(value, str): if isinstance(value, str):
if value.strip() == "": if value.strip() == "":
return None return None
return _parse_flexible_datetime(value) try:
return dt.datetime.fromisoformat(value.strip())
except (ValueError, TypeError):
return None
return None return None
if pg_upper in ("TIME", "TIME WITHOUT TIME ZONE", if pg_upper in ("TIME", "TIME WITHOUT TIME ZONE",
@ -2653,59 +2379,16 @@ def _safe_object_to_datetime(
"""Object-dtype to datetime. Shares the safety net (errstate + """Object-dtype to datetime. Shares the safety net (errstate +
try/except) with :func:`_safe_numeric_to_datetime`. If the column is try/except) with :func:`_safe_numeric_to_datetime`. If the column is
actually numeric-flavored (e.g. SAS wrote numbers into an object actually numeric-flavored (e.g. SAS wrote numbers into an object
column), route to the numeric path; otherwise try our explicit column), route to the numeric path; otherwise parse with ``to_datetime``
``DD-MON-YY`` / strptime format set before falling back to the on the object itself.
generic ``pd.to_datetime`` dateutil parser.
The explicit-format pre-pass exists because:
* ``pd.to_datetime`` on unformatted object columns emits a
``UserWarning`` per chunk and parses row-by-row via ``dateutil``
-- 10-100× slower than a single vectorized strptime.
* ``dateutil`` *will* parse ``23-MAR-20`` but its 2-digit-year pivot
differs from SAS/Oracle convention in corner cases; applying our
own parser keeps behavior predictable.
""" """
coerced = _null_sentinel_mask(series) coerced = series.replace({"": None})
numeric = pd.to_numeric(coerced, errors="coerce") numeric = pd.to_numeric(coerced, errors="coerce")
all_numeric = numeric.notna().sum() == coerced.notna().sum() all_numeric = numeric.notna().sum() == coerced.notna().sum()
if all_numeric and coerced.notna().any(): if all_numeric and coerced.notna().any():
return _safe_numeric_to_datetime( return _safe_numeric_to_datetime(
numeric, unit="s", column_name=column_name, target_type=target_type, numeric, unit="s", column_name=column_name, target_type=target_type,
) )
non_null_count = int(coerced.notna().sum())
if non_null_count:
# First pass: our regex-based ``DD-MON-YY`` parser. Cheap,
# locale-independent, covers the cases ``pd.to_datetime`` warns
# about. Always parse via the datetime-aware variant so a DATE
# target whose chunk happens to carry time components
# (``23-MAR-20 14:30:00``) still parses without warnings; the
# caller's ``.dt.date`` cast truncates the time, matching the
# existing datetime64-input branch.
parsed_py = coerced.map(
lambda v: _parse_flexible_datetime(v) if v is not None else None
)
parsed_ts = pd.to_datetime(parsed_py, errors="coerce")
if int(parsed_ts.notna().sum()) == non_null_count:
return parsed_ts
# Second pass: vectorized ``pd.to_datetime`` with each explicit
# format. One ``pd.to_datetime(format=fmt)`` call is O(n) in C;
# trying a handful of them still beats row-by-row dateutil on
# large chunks. Accept the first format that covers every
# non-null cell.
for fmt in _EXTRA_DATETIME_FORMATS + _EXTRA_DATE_FORMATS:
try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
candidate = pd.to_datetime(coerced, format=fmt, errors="coerce")
except (ValueError, TypeError):
continue
if int(candidate.notna().sum()) == non_null_count:
return candidate
# Final fallback: ``dateutil`` via ``pd.to_datetime``. Handles
# shapes our explicit list missed (rare edge cases, mixed formats
# within one column). Same safety net as the numeric path.
try: try:
with np.errstate(over="ignore", invalid="ignore", divide="ignore"): with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
return pd.to_datetime(coerced, errors="coerce") return pd.to_datetime(coerced, errors="coerce")
@ -2740,13 +2423,13 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
if pg in ("INTEGER", "BIGINT", "SMALLINT"): if pg in ("INTEGER", "BIGINT", "SMALLINT"):
if pd.api.types.is_object_dtype(series): if pd.api.types.is_object_dtype(series):
series = pd.to_numeric( series = pd.to_numeric(
_null_sentinel_mask(series), errors="coerce" series.replace({"": None}), errors="coerce"
) )
out[name] = series.astype("Int64") out[name] = series.astype("Int64")
elif pg in ("DOUBLE PRECISION", "REAL", "NUMERIC"): elif pg in ("DOUBLE PRECISION", "REAL", "NUMERIC"):
if pd.api.types.is_object_dtype(series): if pd.api.types.is_object_dtype(series):
series = pd.to_numeric( series = pd.to_numeric(
_null_sentinel_mask(series), errors="coerce" series.replace({"": None}), errors="coerce"
) )
out[name] = series.astype("float64") out[name] = series.astype("float64")
elif pg == "DATE": elif pg == "DATE":
@ -2805,12 +2488,6 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
# in the COPY statement turns the blanks back into SQL NULL. # in the COPY statement turns the blanks back into SQL NULL.
# astype(str) stringifies NaN/None to the literal "nan"/"None", # astype(str) stringifies NaN/None to the literal "nan"/"None",
# so we mask those after the fact rather than branching per cell. # so we mask those after the fact rather than branching per cell.
# Object columns also get the sentinel sweep
# (:data:`NULL_STRING_SENTINELS`) so a literal ``"null"`` /
# ``"NA"`` / ``"."`` value lands as SQL NULL on the way in,
# matching what the numeric / date branches above do.
if pd.api.types.is_object_dtype(series):
series = _null_sentinel_mask(series)
na_mask = series.isna() na_mask = series.isna()
if pd.api.types.is_numeric_dtype(series): if pd.api.types.is_numeric_dtype(series):
# Hit when a column was auto-unioned to TEXT because at # Hit when a column was auto-unioned to TEXT because at

View File

@ -35,10 +35,6 @@ import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Set, Tuple from typing import List, Set, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Dependency check # Dependency check
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -79,10 +75,10 @@ FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
INPUT_FILE: str = "s3_directories.txt" INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line.""" """Path to the text file containing one S3 prefix per line."""
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket") S3_BUCKET: str = "my-bucket"
"""S3 bucket name (all prefixes are assumed to live in this bucket).""" """S3 bucket name (all prefixes are assumed to live in this bucket)."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default") AWS_PROFILE: str = "default"
"""AWS CLI profile name used for authentication.""" """AWS CLI profile name used for authentication."""
# Text-file reading defaults (used when downloading / previewing text files) # Text-file reading defaults (used when downloading / previewing text files)

View File

@ -21,10 +21,6 @@ import argparse
import os import os
import sys import sys
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3 import boto3
import pandas as pd import pandas as pd
import pyreadstat import pyreadstat
@ -48,7 +44,7 @@ SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
# Configuration — edit these before running (or use CLI arguments) # Configuration — edit these before running (or use CLI arguments)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket") S3_BUCKET: str = "my-bucket"
"""S3 bucket name.""" """S3 bucket name."""
S3_KEY: str = "path/to/file.sas7bdat" S3_KEY: str = "path/to/file.sas7bdat"
@ -57,7 +53,7 @@ S3_KEY: str = "path/to/file.sas7bdat"
LOCAL_FOLDER: str = "./downloads" LOCAL_FOLDER: str = "./downloads"
"""Local directory to download the file into.""" """Local directory to download the file into."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default") AWS_PROFILE: str = "default"
"""AWS CLI profile name used for authentication.""" """AWS CLI profile name used for authentication."""

View File

@ -92,7 +92,6 @@ Exit codes:
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import os
import re import re
import sys import sys
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
@ -100,10 +99,6 @@ from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3 import boto3
import yaml import yaml
@ -231,20 +226,15 @@ def load_download_config(path: Path) -> DownloadConfig:
f"Config at {path} must be a YAML mapping at the top level." f"Config at {path} must be a YAML mapping at the top level."
) )
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as missing = [
# missing when neither the YAML key nor the env var is present. k for k in ("bucket", "prefix", "local_folder") if k not in raw
required_always = ("prefix", "local_folder") ]
missing = [k for k in required_always if k not in raw]
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
missing.insert(0, "bucket")
if missing: if missing:
raise ValueError( raise ValueError(
f"Config {path} missing required keys: {', '.join(missing)}" f"Config {path} missing required keys: {', '.join(missing)}"
) )
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else "" bucket = str(raw["bucket"]).strip()
if not bucket:
bucket = os.environ.get("S3_BUCKET", "")
if not bucket: if not bucket:
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.") raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
@ -266,8 +256,6 @@ def load_download_config(path: Path) -> DownloadConfig:
aws_profile = raw.get("aws_profile") aws_profile = raw.get("aws_profile")
if aws_profile is not None: if aws_profile is not None:
aws_profile = str(aws_profile).strip() or None aws_profile = str(aws_profile).strip() or None
if aws_profile is None:
aws_profile = os.environ.get("AWS_PROFILE") or None
auto_detect = bool(raw.get("auto_detect", True)) auto_detect = bool(raw.get("auto_detect", True))
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}") extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")