Compare commits
No commits in common. "main" and "directory_explorer" have entirely different histories.
main
...
directory_
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,7 +1,6 @@
|
||||
/.venv
|
||||
/samples
|
||||
.env
|
||||
!.env.example
|
||||
/.env
|
||||
__pycache__/
|
||||
venv/
|
||||
*/__pycache__/
|
||||
@ -3,6 +3,3 @@ PGPORT=5432
|
||||
PGUSER=
|
||||
PGPASSWORD=
|
||||
PGDATABASE=
|
||||
|
||||
S3_BUCKET=my-bucket
|
||||
AWS_PROFILE=default
|
||||
|
||||
@ -157,6 +157,7 @@ import threading
|
||||
from concurrent.futures import (
|
||||
CancelledError,
|
||||
ProcessPoolExecutor,
|
||||
ThreadPoolExecutor,
|
||||
as_completed,
|
||||
)
|
||||
from dataclasses import dataclass, field
|
||||
@ -884,94 +885,6 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]:
|
||||
return clusters
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Top-level workers (must be importable for ProcessPoolExecutor pickling)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _prescan_worker(
|
||||
path_str: str,
|
||||
delimiter: str,
|
||||
text_encoding: str,
|
||||
quotechar: str,
|
||||
) -> Tuple[
|
||||
str,
|
||||
Optional[int],
|
||||
Optional[Dict[str, Tuple[str, Optional[str]]]],
|
||||
Optional[str],
|
||||
]:
|
||||
"""Top-level prescan worker for ProcessPoolExecutor.
|
||||
|
||||
Reads only metadata (pyreadstat ``metadataonly=True``) for one file and
|
||||
returns ``(path_str, number_rows, per_column_meta, error_message)``.
|
||||
Kept at module level so ``ProcessPoolExecutor`` can pickle it; the
|
||||
closure version we used to call from a ``ThreadPoolExecutor`` shared
|
||||
the parent's GIL and serialised the per-file Python work, which is
|
||||
why pre-scan felt slow even though the actual disk reads were fast.
|
||||
"""
|
||||
try:
|
||||
meta = read_sas_metadata(
|
||||
Path(path_str),
|
||||
delimiter=delimiter,
|
||||
text_encoding=text_encoding,
|
||||
quotechar=quotechar,
|
||||
)
|
||||
n = getattr(meta, "number_rows", None)
|
||||
col_meta = extract_union_metadata(meta)
|
||||
return (
|
||||
path_str,
|
||||
int(n) if n is not None else None,
|
||||
col_meta,
|
||||
None,
|
||||
)
|
||||
except Exception as e:
|
||||
return (path_str, None, None, f"{type(e).__name__}: {e}")
|
||||
|
||||
|
||||
def _partition_scan_worker(
|
||||
path_str: str,
|
||||
partition_by: List[str],
|
||||
delimiter: str,
|
||||
text_encoding: str,
|
||||
quotechar: str,
|
||||
) -> Tuple[str, Optional[dict], Optional[str]]:
|
||||
"""Top-level partition-discovery worker for ProcessPoolExecutor.
|
||||
|
||||
Streams ``path_str`` with ``usecols=partition_by`` so pyreadstat only
|
||||
decodes the partition columns themselves - on a wide sas7bdat that's
|
||||
typically a 10x+ I/O reduction over reading every column. Returns
|
||||
``(path_str, partial_partition_tree, error_message)``.
|
||||
|
||||
``columns`` is intentionally not passed: partition-value normalisation
|
||||
needs the cluster-wide schema, which is out of process. The merger in
|
||||
:func:`_discover_cluster_partitions` re-applies normalisation when it
|
||||
folds the per-file trees together.
|
||||
"""
|
||||
try:
|
||||
def _chunks() -> Any:
|
||||
for chunk_df, _meta in iter_sas_chunks(
|
||||
Path(path_str),
|
||||
delimiter=delimiter,
|
||||
text_encoding=text_encoding,
|
||||
quotechar=quotechar,
|
||||
usecols=list(partition_by),
|
||||
):
|
||||
yield chunk_df
|
||||
|
||||
tree = discover_partition_values_chunked(
|
||||
_chunks(), list(partition_by),
|
||||
)
|
||||
return (path_str, tree, None)
|
||||
except Exception as e:
|
||||
import traceback as _traceback
|
||||
|
||||
return (
|
||||
path_str,
|
||||
None,
|
||||
f"{type(e).__name__}: {e}\n{_traceback.format_exc()}",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-cluster load
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -1016,83 +929,27 @@ def _infer_cluster_schema(
|
||||
def _discover_cluster_partitions(
|
||||
cluster: ClusterSpec,
|
||||
columns: Dict,
|
||||
*,
|
||||
workers: int = 1,
|
||||
) -> dict:
|
||||
"""Scan ALL files in ``cluster`` to discover partition values.
|
||||
|
||||
Returns a nested partition-value tree suitable for passing to
|
||||
:func:`load_sas.render_partition_ddl` and :func:`load_sas.create_table`.
|
||||
|
||||
Each file is read with ``usecols=cluster.partition_by`` so pyreadstat
|
||||
only decodes the partition columns - on a wide sas7bdat that drops
|
||||
the bytes-touched-per-file by an order of magnitude vs the old
|
||||
full-row scan. With ``workers > 1`` the per-file scans run in a
|
||||
``ProcessPoolExecutor`` and the partial trees are merged as they
|
||||
complete (true parallelism, visible in ``htop`` as N python procs).
|
||||
|
||||
``include`` / ``exclude`` are intentionally not honoured here:
|
||||
partition columns are validated against the inferred schema before
|
||||
we ever get called, which already enforces that they survived any
|
||||
explicit include/exclude filter. The old serial path applied the
|
||||
filter for symmetry, but it was a no-op once usecols pinned us to
|
||||
the partition set anyway.
|
||||
Each file is scanned chunk-by-chunk so the full dataset is never
|
||||
materialized in memory.
|
||||
"""
|
||||
tkw = _build_text_kw(cluster)
|
||||
merged: dict = {}
|
||||
for path in cluster.files:
|
||||
def _filtered_chunks(p=path):
|
||||
for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw):
|
||||
yield apply_column_filter(
|
||||
chunk_df, cluster.include, cluster.exclude
|
||||
)
|
||||
|
||||
if not cluster.files:
|
||||
return merged
|
||||
|
||||
n_workers = max(1, min(int(workers), len(cluster.files)))
|
||||
|
||||
if n_workers <= 1 or len(cluster.files) == 1:
|
||||
for path in cluster.files:
|
||||
def _filtered_chunks(p=path):
|
||||
for chunk_df, _meta in iter_sas_chunks(
|
||||
p, usecols=list(cluster.partition_by), **tkw,
|
||||
):
|
||||
yield chunk_df
|
||||
|
||||
file_tree = discover_partition_values_chunked(
|
||||
_filtered_chunks(), cluster.partition_by, columns,
|
||||
)
|
||||
_merge_partition_trees(merged, file_tree)
|
||||
return merged
|
||||
|
||||
with ProcessPoolExecutor(max_workers=n_workers) as ppool:
|
||||
futures = {
|
||||
ppool.submit(
|
||||
_partition_scan_worker,
|
||||
str(path),
|
||||
list(cluster.partition_by),
|
||||
tkw["delimiter"],
|
||||
tkw["text_encoding"],
|
||||
tkw["quotechar"],
|
||||
): path
|
||||
for path in cluster.files
|
||||
}
|
||||
bar = tqdm(
|
||||
total=len(futures),
|
||||
unit="file",
|
||||
desc=" discovering partitions",
|
||||
file=sys.stderr,
|
||||
dynamic_ncols=True,
|
||||
file_tree = discover_partition_values_chunked(
|
||||
_filtered_chunks(), cluster.partition_by, columns,
|
||||
)
|
||||
try:
|
||||
for fut in as_completed(futures):
|
||||
path = futures[fut]
|
||||
_path_str, file_tree, err = fut.result()
|
||||
bar.update(1)
|
||||
if err is not None:
|
||||
raise RuntimeError(
|
||||
f"partition discovery failed for {path}: {err}"
|
||||
)
|
||||
if file_tree:
|
||||
_merge_partition_trees(merged, file_tree)
|
||||
finally:
|
||||
bar.close()
|
||||
|
||||
_merge_partition_trees(merged, file_tree)
|
||||
return merged
|
||||
|
||||
|
||||
@ -1184,7 +1041,7 @@ def load_cluster(
|
||||
file=sys.stderr,
|
||||
)
|
||||
partition_values = _discover_cluster_partitions(
|
||||
cluster, first_columns, workers=workers,
|
||||
cluster, first_columns,
|
||||
)
|
||||
total_parts = _count_partitions(partition_values)
|
||||
print(
|
||||
@ -1819,7 +1676,7 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
file=sys.stderr,
|
||||
)
|
||||
partition_values = _discover_cluster_partitions(
|
||||
c, columns, workers=max(1, int(args.workers)),
|
||||
c, columns,
|
||||
)
|
||||
total_parts = _count_partitions(partition_values)
|
||||
print(
|
||||
@ -1930,35 +1787,42 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
file=sys.stderr,
|
||||
)
|
||||
else:
|
||||
# Cap at min(workers, file_count, 32). Pyreadstat metadata reads are
|
||||
# mostly C-side I/O + struct decoding, but the per-file Python work
|
||||
# (extract_union_metadata, dict construction) was being serialised
|
||||
# by the GIL in the old ThreadPool path - which is why the bar
|
||||
# crawled even though disk was idle. ProcessPool gives us actual
|
||||
# parallelism; you'll now see N python procs in htop.
|
||||
prescan_workers = min(
|
||||
32, max(1, max(workers, 16)), len(all_files),
|
||||
)
|
||||
prescan_workers = min(16, max(1, len(all_files)))
|
||||
print(
|
||||
f"pre-scanning row counts + per-column metadata for "
|
||||
f"{len(all_files)} file(s) across {prescan_workers} "
|
||||
f"process(es)...",
|
||||
f"{len(all_files)} file(s) across {prescan_workers} thread(s)...",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
def _scan_one(
|
||||
p: Path,
|
||||
) -> Tuple[
|
||||
Path,
|
||||
Optional[int],
|
||||
Optional[Dict[str, Tuple[str, Optional[str]]]],
|
||||
Optional[str],
|
||||
]:
|
||||
try:
|
||||
_prescan_tkw = dict(
|
||||
delimiter=cfg.delimiter,
|
||||
text_encoding=cfg.text_encoding,
|
||||
quotechar=cfg.quotechar,
|
||||
)
|
||||
meta = read_sas_metadata(p, **_prescan_tkw)
|
||||
n = getattr(meta, "number_rows", None)
|
||||
col_meta = extract_union_metadata(meta)
|
||||
return (
|
||||
p,
|
||||
int(n) if n is not None else None,
|
||||
col_meta,
|
||||
None,
|
||||
)
|
||||
except Exception as e:
|
||||
return (p, None, None, str(e))
|
||||
|
||||
unknown_total_files: List[str] = []
|
||||
running_total = 0
|
||||
with ProcessPoolExecutor(max_workers=prescan_workers) as ppool:
|
||||
futures = {
|
||||
ppool.submit(
|
||||
_prescan_worker,
|
||||
str(p),
|
||||
cfg.delimiter,
|
||||
cfg.text_encoding,
|
||||
cfg.quotechar,
|
||||
): p
|
||||
for p in all_files
|
||||
}
|
||||
with ThreadPoolExecutor(max_workers=prescan_workers) as tpool:
|
||||
prescan_bar = tqdm(
|
||||
total=len(all_files),
|
||||
unit="file",
|
||||
@ -1967,20 +1831,16 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
dynamic_ncols=True,
|
||||
)
|
||||
try:
|
||||
for fut in as_completed(futures):
|
||||
path_obj = futures[fut]
|
||||
path_str, n, col_meta, err = fut.result()
|
||||
for p, n, col_meta, err in tpool.map(_scan_one, all_files):
|
||||
prescan_bar.update(1)
|
||||
if err is not None:
|
||||
unknown_total_files.append(
|
||||
f"{path_obj.name} ({err})"
|
||||
)
|
||||
unknown_total_files.append(f"{p.name} ({err})")
|
||||
elif n is None:
|
||||
unknown_total_files.append(path_obj.name)
|
||||
unknown_total_files.append(p.name)
|
||||
else:
|
||||
running_total += n
|
||||
if col_meta is not None:
|
||||
file_meta_by_path[path_str] = col_meta
|
||||
file_meta_by_path[str(p)] = col_meta
|
||||
finally:
|
||||
prescan_bar.close()
|
||||
|
||||
|
||||
@ -315,83 +315,6 @@ The chunk size can be overridden at runtime via the
|
||||
changes. Explicit ``chunksize=`` kwargs still win over both."""
|
||||
|
||||
|
||||
NULL_STRING_SENTINELS: frozenset = frozenset({
|
||||
"null",
|
||||
"na",
|
||||
"n/a",
|
||||
"#n/a",
|
||||
".",
|
||||
"none",
|
||||
"nan",
|
||||
})
|
||||
"""Lowercased string literals treated as SQL ``NULL`` across inference,
|
||||
nullability detection, and COPY preparation. Seen in the wild when a
|
||||
source system exports missing values as the literal text ``"null"``
|
||||
(yes, really; some SAS CHAR columns hold it verbatim) or uses the
|
||||
SAS/Stata ``.`` missing sentinel or spreadsheet-style ``NA`` / ``N/A``.
|
||||
|
||||
Kept narrow on purpose:
|
||||
* ``"null"``, ``"none"``, ``"nan"`` — the common spelled-out missings.
|
||||
* ``"na"``, ``"n/a"``, ``"#n/a"`` — spreadsheet / R conventions.
|
||||
* ``"."`` — SAS / Stata missing sentinel as CHAR export.
|
||||
|
||||
Matching is case-insensitive and ignores leading / trailing whitespace.
|
||||
Extend this set in a calling module (``import load_sas;
|
||||
load_sas.NULL_STRING_SENTINELS = frozenset({...})``) if your source
|
||||
ships additional sentinels. Don't add ambiguous tokens (``"0"``,
|
||||
``"unknown"``) - those are legitimate data in plenty of schemas."""
|
||||
|
||||
|
||||
def _is_null_string(value: Any) -> bool:
|
||||
"""True if ``value`` is a string whose lowercased/stripped form is
|
||||
in :data:`NULL_STRING_SENTINELS`. Safe to call on any Python value;
|
||||
non-strings return False so the helper can be dropped into the same
|
||||
row-walks that also see floats / dates / None."""
|
||||
if not isinstance(value, str):
|
||||
return False
|
||||
s = value.strip()
|
||||
if not s:
|
||||
return False
|
||||
return s.lower() in NULL_STRING_SENTINELS
|
||||
|
||||
|
||||
def _is_char_missing(value: Any) -> bool:
|
||||
"""True if ``value`` should be treated as missing for a CHAR/TEXT
|
||||
column. Unifies the three-way check (None / NaN / empty-or-sentinel
|
||||
string) that used to live inline in several helpers so extending
|
||||
the sentinel set in one place propagates everywhere."""
|
||||
if value is None:
|
||||
return True
|
||||
if isinstance(value, float) and pd.isna(value):
|
||||
return True
|
||||
if isinstance(value, str):
|
||||
s = value.strip()
|
||||
if not s:
|
||||
return True
|
||||
if s.lower() in NULL_STRING_SENTINELS:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _null_sentinel_mask(series: pd.Series) -> pd.Series:
|
||||
"""Return a copy of ``series`` with empty strings and any value in
|
||||
:data:`NULL_STRING_SENTINELS` replaced by ``None``.
|
||||
|
||||
Previously the coercion paths (numeric / datetime / TEXT) only
|
||||
rewrote the empty string. That meant the literal text ``"null"``
|
||||
sailed through ``pd.to_numeric(errors="coerce")`` as ``NaN`` (fine
|
||||
for numerics; by accident) but ``pd.to_datetime(errors="coerce")``
|
||||
handed it to ``dateutil`` which happily parsed it as... today's
|
||||
date (dateutil treats bare words as "use current date for missing
|
||||
fields"). Routing through this helper fixes both problems in one
|
||||
pass. Non-string values are left alone so already-parsed
|
||||
Timestamps / dates / numbers pass through untouched.
|
||||
"""
|
||||
if not pd.api.types.is_object_dtype(series):
|
||||
return series
|
||||
return series.map(lambda v: None if _is_char_missing(v) else v)
|
||||
|
||||
|
||||
VALID_IF_EXISTS = ("fail", "replace", "append")
|
||||
|
||||
VALID_FILE_TYPES = ("sas", "text")
|
||||
@ -888,7 +811,6 @@ def iter_text_chunks(
|
||||
encoding: str = "utf-8",
|
||||
quotechar: str = '"',
|
||||
chunksize: Optional[int] = None,
|
||||
usecols: Optional[List[str]] = None,
|
||||
):
|
||||
"""Yield ``(df_chunk, meta)`` tuples for streaming text file loads.
|
||||
|
||||
@ -896,10 +818,6 @@ def iter_text_chunks(
|
||||
iteration. The metadata object is rebuilt for each chunk with the
|
||||
chunk's column names and ``number_rows`` set to the total file rows
|
||||
(computed once up front).
|
||||
|
||||
When ``usecols`` is provided, only those columns are parsed - useful
|
||||
for cheap partition-value discovery scans where the rest of the row
|
||||
would be wasted I/O.
|
||||
"""
|
||||
path = Path(path)
|
||||
if chunksize is None:
|
||||
@ -914,7 +832,8 @@ def iter_text_chunks(
|
||||
|
||||
total = _count_text_lines(path, encoding)
|
||||
|
||||
read_csv_kwargs: Dict[str, Any] = dict(
|
||||
reader = pd.read_csv(
|
||||
path,
|
||||
delimiter=delimiter,
|
||||
encoding=encoding,
|
||||
quotechar=quotechar,
|
||||
@ -923,10 +842,6 @@ def iter_text_chunks(
|
||||
keep_default_na=True,
|
||||
na_values=[""],
|
||||
)
|
||||
if usecols is not None:
|
||||
read_csv_kwargs["usecols"] = list(usecols)
|
||||
|
||||
reader = pd.read_csv(path, **read_csv_kwargs)
|
||||
for chunk_df in reader:
|
||||
meta = _build_text_metadata(list(chunk_df.columns), number_rows=total)
|
||||
yield chunk_df, meta
|
||||
@ -1026,7 +941,6 @@ def iter_sas_chunks(
|
||||
delimiter: str = ",",
|
||||
text_encoding: str = "utf-8",
|
||||
quotechar: str = '"',
|
||||
usecols: Optional[List[str]] = None,
|
||||
):
|
||||
"""Yield ``(df_chunk, meta)`` tuples for streaming loads.
|
||||
|
||||
@ -1038,13 +952,6 @@ def iter_sas_chunks(
|
||||
parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int
|
||||
always wins.
|
||||
|
||||
When ``usecols`` is provided, pyreadstat only decodes the listed
|
||||
columns. For wide sas7bdat files this is dramatically cheaper than
|
||||
a full read - the C decoder skips unwanted columns instead of
|
||||
materializing them. Used by partition-value discovery to avoid
|
||||
re-reading every byte of every file just to extract a couple of
|
||||
partition keys.
|
||||
|
||||
For text files, delegates to :func:`iter_text_chunks`.
|
||||
"""
|
||||
if _is_text_file(path):
|
||||
@ -1054,7 +961,6 @@ def iter_sas_chunks(
|
||||
encoding=text_encoding,
|
||||
quotechar=quotechar,
|
||||
chunksize=chunksize,
|
||||
usecols=usecols,
|
||||
)
|
||||
return
|
||||
if chunksize is None:
|
||||
@ -1067,9 +973,6 @@ def iter_sas_chunks(
|
||||
else:
|
||||
chunksize = DEFAULT_CHUNK_ROWS
|
||||
reader, kwargs = _sas_reader(path)
|
||||
if usecols is not None:
|
||||
kwargs = dict(kwargs)
|
||||
kwargs["usecols"] = list(usecols)
|
||||
yield from pyreadstat.read_file_in_chunks(
|
||||
reader, str(Path(path)), chunksize=chunksize, **kwargs
|
||||
)
|
||||
@ -1258,12 +1161,12 @@ def union_column_types(
|
||||
|
||||
def _all_null(series: pd.Series) -> bool:
|
||||
if pd.api.types.is_object_dtype(series):
|
||||
return bool(series.map(_is_char_missing).all())
|
||||
return bool(series.map(lambda v: v is None or (isinstance(v, str) and v == "") or (isinstance(v, float) and pd.isna(v))).all())
|
||||
return bool(series.isna().all())
|
||||
|
||||
|
||||
def _char_missing_mask(series: pd.Series) -> pd.Series:
|
||||
return series.map(_is_char_missing)
|
||||
return series.map(lambda v: v is None or (isinstance(v, float) and pd.isna(v)) or (isinstance(v, str) and v == ""))
|
||||
|
||||
|
||||
def _is_nullable(series: pd.Series) -> bool:
|
||||
@ -1338,197 +1241,20 @@ def _try_float_coerce(values: List[str]) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
# Locale-independent month lookup so ``DD-MON-YY`` / ``DDMONYYYY`` style
|
||||
# strings (Oracle's default ``DD-MON-YY`` export, SAS ``DATE7.`` /
|
||||
# ``DATE9.`` rendered to text, spreadsheets spitting out ``23-Mar-2020``)
|
||||
# parse correctly regardless of the host's ``LC_TIME``. ``strptime("%b")``
|
||||
# is locale-dependent and silently fails on non-English systems; this
|
||||
# dict sidesteps that entirely.
|
||||
_MONTH_LOOKUP: Dict[str, int] = {
|
||||
"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
|
||||
"JUL": 7, "AUG": 8, "SEP": 9, "SEPT": 9, "OCT": 10, "NOV": 11, "DEC": 12,
|
||||
"JANUARY": 1, "FEBRUARY": 2, "MARCH": 3, "APRIL": 4, "JUNE": 6,
|
||||
"JULY": 7, "AUGUST": 8, "SEPTEMBER": 9, "OCTOBER": 10,
|
||||
"NOVEMBER": 11, "DECEMBER": 12,
|
||||
}
|
||||
|
||||
# ``DD[sep]MON[sep]YY`` with an optional ``HH:MM[:SS[.ffff]] [AM|PM]``
|
||||
# suffix. ``sep`` can be ``-``, ``/``, space, or empty so the same
|
||||
# regex covers ``23-MAR-20``, ``23-MAR-2020``, ``23MAR2020`` (SAS
|
||||
# ``DATE9.``), ``23 Mar 2020`` (Excel), and ``23-MAR-20 14:30:00``
|
||||
# (Oracle ``TO_CHAR`` default with timestamp). Time portion is lenient
|
||||
# on separator (``:`` or ``.``) since Oracle's default timestamp
|
||||
# rendering uses dots (``02.30.45.123456``) while most others use
|
||||
# colons.
|
||||
_DDMONYY_RE = re.compile(
|
||||
r"""
|
||||
^\s*
|
||||
(?P<day>\d{1,2})
|
||||
[-/\s]?
|
||||
(?P<month>[A-Za-z]{3,9})
|
||||
[-/\s]?
|
||||
(?P<year>\d{2}|\d{4})
|
||||
(?:
|
||||
[\sT:]+
|
||||
(?P<hour>\d{1,2}) [:.] (?P<minute>\d{2})
|
||||
(?:
|
||||
[:.] (?P<second>\d{2})
|
||||
(?: \. (?P<micro>\d+) )?
|
||||
)?
|
||||
\s*
|
||||
(?P<ampm>[AaPp][Mm])?
|
||||
)?
|
||||
\s*$
|
||||
""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
# Strptime fallbacks for all-numeric shapes the regex above can't
|
||||
# disambiguate. Order matters: unambiguous 4-digit-year layouts first,
|
||||
# then US-style ``mm/dd`` before EU-style ``dd/mm`` (the former is
|
||||
# dominant in the kinds of exports this loader sees). Columns whose
|
||||
# true format is ``DD/MM/YY`` should pin the Postgres type via
|
||||
# ``column_types: {col: TEXT}`` and parse themselves downstream.
|
||||
_EXTRA_DATE_FORMATS: Tuple[str, ...] = (
|
||||
"%Y/%m/%d",
|
||||
"%Y%m%d",
|
||||
"%m/%d/%Y",
|
||||
"%m/%d/%y",
|
||||
"%m-%d-%Y",
|
||||
"%m-%d-%y",
|
||||
"%d/%m/%Y",
|
||||
"%d/%m/%y",
|
||||
"%d-%m-%Y",
|
||||
"%d-%m-%y",
|
||||
)
|
||||
|
||||
_EXTRA_DATETIME_FORMATS: Tuple[str, ...] = (
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
"%Y-%m-%d %H:%M:%S.%f",
|
||||
"%Y-%m-%dT%H:%M:%S",
|
||||
"%Y-%m-%dT%H:%M:%S.%f",
|
||||
"%m/%d/%Y %H:%M:%S",
|
||||
"%m/%d/%Y %H:%M",
|
||||
"%m/%d/%y %H:%M:%S",
|
||||
"%m/%d/%y %H:%M",
|
||||
"%d/%m/%Y %H:%M:%S",
|
||||
"%d/%m/%y %H:%M:%S",
|
||||
"%Y/%m/%d %H:%M:%S",
|
||||
)
|
||||
|
||||
|
||||
def _parse_flexible_date(value: Any) -> Optional[dt.date]:
|
||||
"""Parse ``value`` to ``datetime.date`` using ISO first, then the
|
||||
``DD-MON-YY`` family, then the numeric fallbacks in
|
||||
:data:`_EXTRA_DATE_FORMATS`. Returns ``None`` if nothing matches.
|
||||
|
||||
Non-string / empty / non-finite inputs return ``None`` rather than
|
||||
raising so callers can use this as a drop-in replacement for the old
|
||||
``dt.date.fromisoformat`` + ``try``/``except`` pattern.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
s = value.strip()
|
||||
if not s:
|
||||
return None
|
||||
try:
|
||||
return dt.date.fromisoformat(s)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
m = _DDMONYY_RE.match(s)
|
||||
# Reject inputs that carry a time component so ``_try_date_coerce``
|
||||
# doesn't silently swallow ``TIMESTAMP`` columns (``23-MAR-20 14:30:00``)
|
||||
# and misclassify them as ``DATE``.
|
||||
if m and m.group("hour") is None:
|
||||
month = _MONTH_LOOKUP.get(m.group("month").upper())
|
||||
if month is not None:
|
||||
try:
|
||||
day = int(m.group("day"))
|
||||
year = int(m.group("year"))
|
||||
if len(m.group("year")) == 2:
|
||||
# Pivot year = 69 matches SAS / Oracle / Excel
|
||||
# conventions: ``00..68`` -> 2000s, ``69..99`` -> 1900s.
|
||||
year = 2000 + year if year < 69 else 1900 + year
|
||||
return dt.date(year, month, day)
|
||||
except ValueError:
|
||||
return None
|
||||
for fmt in _EXTRA_DATE_FORMATS:
|
||||
try:
|
||||
return dt.datetime.strptime(s, fmt).date()
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _parse_flexible_datetime(value: Any) -> Optional[dt.datetime]:
|
||||
"""Parse ``value`` to ``datetime.datetime``. Same format coverage as
|
||||
:func:`_parse_flexible_date` plus explicit datetime shapes; a
|
||||
date-only input is promoted to midnight so callers can treat a
|
||||
column that mixes ``23-MAR-20`` and ``23-MAR-20 14:30:00`` as
|
||||
``TIMESTAMP`` end-to-end.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
s = value.strip()
|
||||
if not s:
|
||||
return None
|
||||
try:
|
||||
return dt.datetime.fromisoformat(s)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
m = _DDMONYY_RE.match(s)
|
||||
if m:
|
||||
month = _MONTH_LOOKUP.get(m.group("month").upper())
|
||||
if month is not None:
|
||||
try:
|
||||
day = int(m.group("day"))
|
||||
year = int(m.group("year"))
|
||||
if len(m.group("year")) == 2:
|
||||
year = 2000 + year if year < 69 else 1900 + year
|
||||
hour = int(m.group("hour")) if m.group("hour") else 0
|
||||
minute = int(m.group("minute")) if m.group("minute") else 0
|
||||
second = int(m.group("second")) if m.group("second") else 0
|
||||
micro = 0
|
||||
if m.group("micro"):
|
||||
# ``%f`` expects 1-6 digits; pad / truncate to match.
|
||||
micro_s = m.group("micro")[:6].ljust(6, "0")
|
||||
micro = int(micro_s)
|
||||
ampm = m.group("ampm")
|
||||
if ampm:
|
||||
ap = ampm.upper()
|
||||
if ap == "PM" and hour < 12:
|
||||
hour += 12
|
||||
elif ap == "AM" and hour == 12:
|
||||
hour = 0
|
||||
return dt.datetime(year, month, day, hour, minute, second, micro)
|
||||
except ValueError:
|
||||
return None
|
||||
for fmt in _EXTRA_DATETIME_FORMATS:
|
||||
try:
|
||||
return dt.datetime.strptime(s, fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
# Final fallback: accept a date-only string and promote to midnight.
|
||||
d = _parse_flexible_date(s)
|
||||
if d is not None:
|
||||
return dt.datetime(d.year, d.month, d.day)
|
||||
return None
|
||||
|
||||
|
||||
def _try_date_coerce(values: List[str]) -> bool:
|
||||
for v in values:
|
||||
if _parse_flexible_date(v) is None:
|
||||
try:
|
||||
dt.date.fromisoformat(v)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _try_datetime_coerce(values: List[str]) -> bool:
|
||||
for v in values:
|
||||
if _parse_flexible_datetime(v) is None:
|
||||
try:
|
||||
dt.datetime.fromisoformat(v)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
return True
|
||||
|
||||
@ -2098,12 +1824,6 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
# Sentinel strings (``"null"``, ``"NA"``, ``"."``, ...) collapse to
|
||||
# Python None up front so every type branch below can skip its own
|
||||
# empty-string dance.
|
||||
if _is_null_string(value):
|
||||
return None
|
||||
|
||||
pg_upper = pg_type.upper()
|
||||
|
||||
if pg_upper in ("INTEGER", "BIGINT", "SMALLINT", "INT", "INT4", "INT8", "INT2"):
|
||||
@ -2136,7 +1856,10 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
|
||||
if isinstance(value, str):
|
||||
if value.strip() == "":
|
||||
return None
|
||||
return _parse_flexible_date(value)
|
||||
try:
|
||||
return dt.date.fromisoformat(value.strip())
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
return None
|
||||
|
||||
if pg_upper in ("TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE",
|
||||
@ -2150,7 +1873,10 @@ def _normalize_partition_value(value: Any, pg_type: str) -> Any:
|
||||
if isinstance(value, str):
|
||||
if value.strip() == "":
|
||||
return None
|
||||
return _parse_flexible_datetime(value)
|
||||
try:
|
||||
return dt.datetime.fromisoformat(value.strip())
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
return None
|
||||
|
||||
if pg_upper in ("TIME", "TIME WITHOUT TIME ZONE",
|
||||
@ -2653,59 +2379,16 @@ def _safe_object_to_datetime(
|
||||
"""Object-dtype to datetime. Shares the safety net (errstate +
|
||||
try/except) with :func:`_safe_numeric_to_datetime`. If the column is
|
||||
actually numeric-flavored (e.g. SAS wrote numbers into an object
|
||||
column), route to the numeric path; otherwise try our explicit
|
||||
``DD-MON-YY`` / strptime format set before falling back to the
|
||||
generic ``pd.to_datetime`` dateutil parser.
|
||||
|
||||
The explicit-format pre-pass exists because:
|
||||
* ``pd.to_datetime`` on unformatted object columns emits a
|
||||
``UserWarning`` per chunk and parses row-by-row via ``dateutil``
|
||||
-- 10-100× slower than a single vectorized strptime.
|
||||
* ``dateutil`` *will* parse ``23-MAR-20`` but its 2-digit-year pivot
|
||||
differs from SAS/Oracle convention in corner cases; applying our
|
||||
own parser keeps behavior predictable.
|
||||
column), route to the numeric path; otherwise parse with ``to_datetime``
|
||||
on the object itself.
|
||||
"""
|
||||
coerced = _null_sentinel_mask(series)
|
||||
coerced = series.replace({"": None})
|
||||
numeric = pd.to_numeric(coerced, errors="coerce")
|
||||
all_numeric = numeric.notna().sum() == coerced.notna().sum()
|
||||
if all_numeric and coerced.notna().any():
|
||||
return _safe_numeric_to_datetime(
|
||||
numeric, unit="s", column_name=column_name, target_type=target_type,
|
||||
)
|
||||
|
||||
non_null_count = int(coerced.notna().sum())
|
||||
if non_null_count:
|
||||
# First pass: our regex-based ``DD-MON-YY`` parser. Cheap,
|
||||
# locale-independent, covers the cases ``pd.to_datetime`` warns
|
||||
# about. Always parse via the datetime-aware variant so a DATE
|
||||
# target whose chunk happens to carry time components
|
||||
# (``23-MAR-20 14:30:00``) still parses without warnings; the
|
||||
# caller's ``.dt.date`` cast truncates the time, matching the
|
||||
# existing datetime64-input branch.
|
||||
parsed_py = coerced.map(
|
||||
lambda v: _parse_flexible_datetime(v) if v is not None else None
|
||||
)
|
||||
parsed_ts = pd.to_datetime(parsed_py, errors="coerce")
|
||||
if int(parsed_ts.notna().sum()) == non_null_count:
|
||||
return parsed_ts
|
||||
|
||||
# Second pass: vectorized ``pd.to_datetime`` with each explicit
|
||||
# format. One ``pd.to_datetime(format=fmt)`` call is O(n) in C;
|
||||
# trying a handful of them still beats row-by-row dateutil on
|
||||
# large chunks. Accept the first format that covers every
|
||||
# non-null cell.
|
||||
for fmt in _EXTRA_DATETIME_FORMATS + _EXTRA_DATE_FORMATS:
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
candidate = pd.to_datetime(coerced, format=fmt, errors="coerce")
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
if int(candidate.notna().sum()) == non_null_count:
|
||||
return candidate
|
||||
|
||||
# Final fallback: ``dateutil`` via ``pd.to_datetime``. Handles
|
||||
# shapes our explicit list missed (rare edge cases, mixed formats
|
||||
# within one column). Same safety net as the numeric path.
|
||||
try:
|
||||
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
|
||||
return pd.to_datetime(coerced, errors="coerce")
|
||||
@ -2740,13 +2423,13 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
if pg in ("INTEGER", "BIGINT", "SMALLINT"):
|
||||
if pd.api.types.is_object_dtype(series):
|
||||
series = pd.to_numeric(
|
||||
_null_sentinel_mask(series), errors="coerce"
|
||||
series.replace({"": None}), errors="coerce"
|
||||
)
|
||||
out[name] = series.astype("Int64")
|
||||
elif pg in ("DOUBLE PRECISION", "REAL", "NUMERIC"):
|
||||
if pd.api.types.is_object_dtype(series):
|
||||
series = pd.to_numeric(
|
||||
_null_sentinel_mask(series), errors="coerce"
|
||||
series.replace({"": None}), errors="coerce"
|
||||
)
|
||||
out[name] = series.astype("float64")
|
||||
elif pg == "DATE":
|
||||
@ -2805,12 +2488,6 @@ def _prepare_for_copy(df: pd.DataFrame, columns: Dict[str, ColumnSpec]) -> pd.Da
|
||||
# in the COPY statement turns the blanks back into SQL NULL.
|
||||
# astype(str) stringifies NaN/None to the literal "nan"/"None",
|
||||
# so we mask those after the fact rather than branching per cell.
|
||||
# Object columns also get the sentinel sweep
|
||||
# (:data:`NULL_STRING_SENTINELS`) so a literal ``"null"`` /
|
||||
# ``"NA"`` / ``"."`` value lands as SQL NULL on the way in,
|
||||
# matching what the numeric / date branches above do.
|
||||
if pd.api.types.is_object_dtype(series):
|
||||
series = _null_sentinel_mask(series)
|
||||
na_mask = series.isna()
|
||||
if pd.api.types.is_numeric_dtype(series):
|
||||
# Hit when a column was auto-unioned to TEXT because at
|
||||
|
||||
@ -35,10 +35,6 @@ import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Set, Tuple
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dependency check
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -79,10 +75,10 @@ FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
|
||||
INPUT_FILE: str = "s3_directories.txt"
|
||||
"""Path to the text file containing one S3 prefix per line."""
|
||||
|
||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
||||
S3_BUCKET: str = "my-bucket"
|
||||
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
|
||||
|
||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
||||
AWS_PROFILE: str = "default"
|
||||
"""AWS CLI profile name used for authentication."""
|
||||
|
||||
# Text-file reading defaults (used when downloading / previewing text files)
|
||||
|
||||
@ -21,10 +21,6 @@ import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
import boto3
|
||||
import pandas as pd
|
||||
import pyreadstat
|
||||
@ -48,7 +44,7 @@ SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
||||
# Configuration — edit these before running (or use CLI arguments)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
||||
S3_BUCKET: str = "my-bucket"
|
||||
"""S3 bucket name."""
|
||||
|
||||
S3_KEY: str = "path/to/file.sas7bdat"
|
||||
@ -57,7 +53,7 @@ S3_KEY: str = "path/to/file.sas7bdat"
|
||||
LOCAL_FOLDER: str = "./downloads"
|
||||
"""Local directory to download the file into."""
|
||||
|
||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
||||
AWS_PROFILE: str = "default"
|
||||
"""AWS CLI profile name used for authentication."""
|
||||
|
||||
|
||||
|
||||
@ -92,7 +92,6 @@ Exit codes:
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
@ -100,10 +99,6 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
import boto3
|
||||
import yaml
|
||||
|
||||
@ -231,20 +226,15 @@ def load_download_config(path: Path) -> DownloadConfig:
|
||||
f"Config at {path} must be a YAML mapping at the top level."
|
||||
)
|
||||
|
||||
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
|
||||
# missing when neither the YAML key nor the env var is present.
|
||||
required_always = ("prefix", "local_folder")
|
||||
missing = [k for k in required_always if k not in raw]
|
||||
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
|
||||
missing.insert(0, "bucket")
|
||||
missing = [
|
||||
k for k in ("bucket", "prefix", "local_folder") if k not in raw
|
||||
]
|
||||
if missing:
|
||||
raise ValueError(
|
||||
f"Config {path} missing required keys: {', '.join(missing)}"
|
||||
)
|
||||
|
||||
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
|
||||
if not bucket:
|
||||
bucket = os.environ.get("S3_BUCKET", "")
|
||||
bucket = str(raw["bucket"]).strip()
|
||||
if not bucket:
|
||||
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
|
||||
|
||||
@ -266,8 +256,6 @@ def load_download_config(path: Path) -> DownloadConfig:
|
||||
aws_profile = raw.get("aws_profile")
|
||||
if aws_profile is not None:
|
||||
aws_profile = str(aws_profile).strip() or None
|
||||
if aws_profile is None:
|
||||
aws_profile = os.environ.get("AWS_PROFILE") or None
|
||||
|
||||
auto_detect = bool(raw.get("auto_detect", True))
|
||||
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user