Compare commits
No commits in common. "3a0537270c1607881dfc7224f917e4a647f4748d" and "f681f1012ab14790a42c3ecb153082b245eb415f" have entirely different histories.
3a0537270c
...
f681f1012a
@ -8,181 +8,6 @@ Python 3.9 compatible (target is an air-gapped host that currently only has
|
|||||||
3.9). ``from __future__ import annotations`` lets us use PEP 585 generics
|
3.9). ``from __future__ import annotations`` lets us use PEP 585 generics
|
||||||
as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick
|
as annotations; runtime-resolved type uses (dataclass defaults, etc.) stick
|
||||||
to ``typing``.
|
to ``typing``.
|
||||||
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
USAGE
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
Supported inputs:
|
|
||||||
* ``.sas7bdat`` (read with ``encoding="latin-1"``)
|
|
||||||
* ``.xpt`` / ``.xport`` (SAS transport files)
|
|
||||||
|
|
||||||
1. YAML config
|
|
||||||
--------------
|
|
||||||
Every invocation is driven by a YAML file describing one SAS file to load::
|
|
||||||
|
|
||||||
filename: samples/sample_kitchensink.xpt # required; relative paths are
|
|
||||||
# resolved against the config
|
|
||||||
# file's directory when possible
|
|
||||||
schemaname: public # required
|
|
||||||
tablename: kitchensink # required
|
|
||||||
|
|
||||||
# Optional. One of: fail | replace | append. Default: fail.
|
|
||||||
# fail - error out if the target table already exists
|
|
||||||
# replace - DROP and recreate the table from the inferred schema
|
|
||||||
# append - keep the existing table; pre-flight a schema-compat check,
|
|
||||||
# then COPY the new rows in
|
|
||||||
if_exists: append
|
|
||||||
|
|
||||||
# Optional, mutually exclusive. Restrict which columns are loaded.
|
|
||||||
# include:
|
|
||||||
# - ID
|
|
||||||
# - INTCOL
|
|
||||||
# exclude:
|
|
||||||
# - ALLNULL
|
|
||||||
|
|
||||||
2. Database connection
|
|
||||||
----------------------
|
|
||||||
The loader uses standard libpq environment variables (read via ``os.environ``)::
|
|
||||||
|
|
||||||
PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE
|
|
||||||
|
|
||||||
The CLI calls ``python-dotenv``'s ``load_dotenv()`` at startup, so a local
|
|
||||||
``.env`` file is picked up automatically. Library callers are responsible for
|
|
||||||
populating the environment themselves (either call ``load_dotenv()`` or export
|
|
||||||
the vars) before calling :func:`connect`.
|
|
||||||
|
|
||||||
3. Command-line interface
|
|
||||||
-------------------------
|
|
||||||
::
|
|
||||||
|
|
||||||
python load_sas.py --config path/to/config.yaml [--validate] [--dry-run]
|
|
||||||
|
|
||||||
Flags:
|
|
||||||
--config PATH Required. Path to the YAML config above.
|
|
||||||
--validate Compare the inferred schema against
|
|
||||||
``<sas-file-stem>.expected.json`` sitting next to the SAS
|
|
||||||
file. Exits nonzero on mismatch. Safe to combine with
|
|
||||||
``--dry-run``.
|
|
||||||
--dry-run Print the inferred ``CREATE TABLE`` SQL and stop. The
|
|
||||||
database is never touched (no connection is opened).
|
|
||||||
|
|
||||||
Exit codes:
|
|
||||||
0 - success (load completed, or dry-run/validate passed)
|
|
||||||
1 - validation failure
|
|
||||||
2 - config references a SAS file that does not exist
|
|
||||||
Other nonzero - uncaught exception (traceback printed); the transaction
|
|
||||||
is rolled back before exit.
|
|
||||||
|
|
||||||
Typical invocations::
|
|
||||||
|
|
||||||
# Preview the inferred schema without connecting to Postgres.
|
|
||||||
python load_sas.py --config sample_config.yaml --dry-run
|
|
||||||
|
|
||||||
# Check the inferred schema against an expected-types manifest.
|
|
||||||
python load_sas.py --config sample_config.yaml --validate --dry-run
|
|
||||||
|
|
||||||
# Actually load the data.
|
|
||||||
python load_sas.py --config sample_config.yaml
|
|
||||||
|
|
||||||
4. Expected-types manifest (``--validate``)
|
|
||||||
-------------------------------------------
|
|
||||||
``--validate`` looks for a JSON file named ``<sas-stem>.expected.json`` next
|
|
||||||
to the SAS file, e.g. ``samples/sample_kitchensink.xpt`` pairs with
|
|
||||||
``samples/sample_kitchensink.expected.json``. Each top-level key is a column
|
|
||||||
name; the value is an object with any of::
|
|
||||||
|
|
||||||
{
|
|
||||||
"postgres_type": "BIGINT", # exact expected type, OR
|
|
||||||
"acceptable_types": ["TEXT", # any-of list of acceptable types
|
|
||||||
"VARCHAR"],
|
|
||||||
"nullable": true, # default true; false = must be NOT NULL
|
|
||||||
"note": "free-form comment" # ignored by the loader
|
|
||||||
}
|
|
||||||
|
|
||||||
Type comparison ignores length/precision modifiers and normalizes synonyms
|
|
||||||
(e.g. ``INT`` == ``INTEGER`` == ``INT4``; ``VARCHAR(10)`` == ``VARCHAR``).
|
|
||||||
Nullability tightening (inferred NULL, manifest NOT NULL) is a hard failure;
|
|
||||||
loosening is not checked here because the append-mode check already covers it.
|
|
||||||
|
|
||||||
5. Library usage
|
|
||||||
----------------
|
|
||||||
The CLI is a thin wrapper around composable functions. A typical orchestrator
|
|
||||||
looks like::
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from load_sas import (
|
|
||||||
load_config, read_sas, apply_column_filter, infer_schema,
|
|
||||||
validate_against_manifest, render_create_table,
|
|
||||||
connect, create_table, copy_dataframe,
|
|
||||||
)
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
cfg = load_config("config.yaml")
|
|
||||||
df, meta = read_sas(cfg.filename)
|
|
||||||
df = apply_column_filter(df, cfg.include, cfg.exclude)
|
|
||||||
columns = infer_schema(df, meta)
|
|
||||||
|
|
||||||
# Optional: preview
|
|
||||||
print(render_create_table(cfg.schemaname, cfg.tablename, columns))
|
|
||||||
|
|
||||||
# Optional: validate against a manifest
|
|
||||||
problems = validate_against_manifest(columns, Path("expected.json"))
|
|
||||||
assert not problems, problems
|
|
||||||
|
|
||||||
conn = connect()
|
|
||||||
conn.autocommit = False
|
|
||||||
try:
|
|
||||||
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
|
||||||
rows = copy_dataframe(conn, cfg.schemaname, cfg.tablename, df, columns)
|
|
||||||
conn.commit()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
All functions are side-effect free except :func:`connect`, :func:`create_table`,
|
|
||||||
and :func:`copy_dataframe`; schema inference (:func:`infer_schema`) accepts a
|
|
||||||
``coerce_chars`` kwarg to override the module-level ``COERCE_CHAR_COLUMNS``
|
|
||||||
without mutating global state.
|
|
||||||
|
|
||||||
6. Type inference summary
|
|
||||||
-------------------------
|
|
||||||
Priority order used by :func:`infer_schema`:
|
|
||||||
|
|
||||||
1. SAS format string (via ``meta.original_variable_types``):
|
|
||||||
``DATETIME*`` -> ``TIMESTAMP``, ``TIME*`` -> ``TIME``,
|
|
||||||
``DATE*`` / ``YYMMDD*`` / ``MMDDYY*`` / ``DDMMYY*`` / ``JULIAN*`` -> ``DATE``.
|
|
||||||
2. All-null column -> ``TEXT`` (with a note).
|
|
||||||
3. pandas datetime dtype -> ``TIMESTAMP``.
|
|
||||||
4. Object columns containing only ``datetime.date`` / ``datetime.datetime``
|
|
||||||
-> ``DATE`` or ``TIMESTAMP``.
|
|
||||||
5. Object columns of strings: if ``COERCE_CHAR_COLUMNS`` is True and at
|
|
||||||
least ``CHAR_INFERENCE_MIN_VALUES`` non-empty values parse cleanly, they
|
|
||||||
are promoted to ``INTEGER`` / ``BIGINT`` / ``DOUBLE PRECISION`` /
|
|
||||||
``DATE`` / ``TIMESTAMP``; otherwise ``TEXT``.
|
|
||||||
6. Numeric columns of whole numbers -> ``INTEGER`` (or ``BIGINT`` if any
|
|
||||||
value exceeds the int32 range ``NUMERIC_INT_RANGE``); otherwise
|
|
||||||
``DOUBLE PRECISION``.
|
|
||||||
|
|
||||||
Type inference scans only the first ``TYPE_INFERENCE_SAMPLE_ROWS`` rows for
|
|
||||||
performance on large files. Nullability and all-null detection still run over
|
|
||||||
the full column (they're vectorized and fast) so a ``NOT NULL`` constraint is
|
|
||||||
never declared for a column that has a null anywhere in the file. Tradeoff:
|
|
||||||
if the first N rows fit ``INTEGER`` but a later row exceeds int32, COPY will
|
|
||||||
fail; bump the sample size or set ``TYPE_INFERENCE_SAMPLE_ROWS = None`` to
|
|
||||||
scan the whole column.
|
|
||||||
|
|
||||||
7. Tunables
|
|
||||||
-----------
|
|
||||||
Module-level knobs at the top of this file:
|
|
||||||
|
|
||||||
* ``COERCE_CHAR_COLUMNS`` - whether to promote stringly-typed numerics/
|
|
||||||
dates (default True).
|
|
||||||
* ``CHAR_INFERENCE_MIN_VALUES`` - minimum non-empty sample size before
|
|
||||||
char-column coercion is attempted.
|
|
||||||
* ``NUMERIC_INT_RANGE`` - INTEGER bounds; values outside become
|
|
||||||
``BIGINT``.
|
|
||||||
* ``TYPE_INFERENCE_SAMPLE_ROWS`` - cap on rows used for type inference
|
|
||||||
(``None`` = scan the whole column).
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@ -220,15 +45,6 @@ values; too small a sample is easy to mis-infer."""
|
|||||||
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
NUMERIC_INT_RANGE = (-2_147_483_648, 2_147_483_647)
|
||||||
"""INTEGER bounds; anything outside becomes BIGINT."""
|
"""INTEGER bounds; anything outside becomes BIGINT."""
|
||||||
|
|
||||||
TYPE_INFERENCE_SAMPLE_ROWS: Optional[int] = 10_000
|
|
||||||
"""Cap on rows inspected during per-column type inference. The row-walking
|
|
||||||
helpers (date detection on object columns, string-coercion probes, whole-number
|
|
||||||
check on numeric columns) operate on ``df.head(TYPE_INFERENCE_SAMPLE_ROWS)``
|
|
||||||
instead of the full frame, which matters on SAS files with hundreds of millions
|
|
||||||
of rows. Nullability is still evaluated across the whole column (cheap,
|
|
||||||
vectorized) so ``NOT NULL`` declarations remain safe. Set to ``None`` to scan
|
|
||||||
every row."""
|
|
||||||
|
|
||||||
|
|
||||||
VALID_IF_EXISTS = ("fail", "replace", "append")
|
VALID_IF_EXISTS = ("fail", "replace", "append")
|
||||||
|
|
||||||
@ -566,16 +382,6 @@ def infer_schema(
|
|||||||
"""
|
"""
|
||||||
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
original_formats: Dict[str, str] = dict(getattr(meta, "original_variable_types", {}) or {})
|
||||||
|
|
||||||
# Row-walking type probes run on a bounded head slice; nullability and the
|
|
||||||
# all-null check still see every row so NOT NULL declarations stay honest.
|
|
||||||
total_rows = len(df)
|
|
||||||
if TYPE_INFERENCE_SAMPLE_ROWS is not None and total_rows > TYPE_INFERENCE_SAMPLE_ROWS:
|
|
||||||
sample_df = df.head(TYPE_INFERENCE_SAMPLE_ROWS)
|
|
||||||
sampled = True
|
|
||||||
else:
|
|
||||||
sample_df = df
|
|
||||||
sampled = False
|
|
||||||
|
|
||||||
# Temporarily flip the module-level flag if the caller asked us to.
|
# Temporarily flip the module-level flag if the caller asked us to.
|
||||||
global COERCE_CHAR_COLUMNS
|
global COERCE_CHAR_COLUMNS
|
||||||
saved = COERCE_CHAR_COLUMNS
|
saved = COERCE_CHAR_COLUMNS
|
||||||
@ -584,7 +390,6 @@ def infer_schema(
|
|||||||
out: Dict[str, ColumnSpec] = {}
|
out: Dict[str, ColumnSpec] = {}
|
||||||
for col in df.columns:
|
for col in df.columns:
|
||||||
series = df[col]
|
series = df[col]
|
||||||
sample_series = sample_df[col]
|
|
||||||
sas_format = original_formats.get(col)
|
sas_format = original_formats.get(col)
|
||||||
notes: List[str] = []
|
notes: List[str] = []
|
||||||
|
|
||||||
@ -597,13 +402,13 @@ def infer_schema(
|
|||||||
elif pd.api.types.is_datetime64_any_dtype(series):
|
elif pd.api.types.is_datetime64_any_dtype(series):
|
||||||
pg_type = "TIMESTAMP"
|
pg_type = "TIMESTAMP"
|
||||||
elif pd.api.types.is_object_dtype(series):
|
elif pd.api.types.is_object_dtype(series):
|
||||||
is_dates, any_dt = _object_is_dates(sample_series)
|
is_dates, any_dt = _object_is_dates(series)
|
||||||
if is_dates:
|
if is_dates:
|
||||||
pg_type = "TIMESTAMP" if any_dt else "DATE"
|
pg_type = "TIMESTAMP" if any_dt else "DATE"
|
||||||
else:
|
else:
|
||||||
pg_type = _infer_char_type(sample_series)
|
pg_type = _infer_char_type(series)
|
||||||
elif pd.api.types.is_numeric_dtype(series):
|
elif pd.api.types.is_numeric_dtype(series):
|
||||||
int_target = _numeric_int_target(sample_series)
|
int_target = _numeric_int_target(series)
|
||||||
if int_target is not None:
|
if int_target is not None:
|
||||||
pg_type = int_target
|
pg_type = int_target
|
||||||
else:
|
else:
|
||||||
@ -612,12 +417,6 @@ def infer_schema(
|
|||||||
pg_type = "TEXT"
|
pg_type = "TEXT"
|
||||||
notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT")
|
notes.append(f"unhandled dtype {series.dtype}; defaulting to TEXT")
|
||||||
|
|
||||||
if sampled:
|
|
||||||
notes.append(
|
|
||||||
f"type inferred from first {TYPE_INFERENCE_SAMPLE_ROWS:,} of "
|
|
||||||
f"{total_rows:,} rows"
|
|
||||||
)
|
|
||||||
|
|
||||||
nullable = _is_nullable(series)
|
nullable = _is_nullable(series)
|
||||||
|
|
||||||
out[col] = ColumnSpec(
|
out[col] = ColumnSpec(
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user