Compare commits
2 Commits
384103f489
...
e48038f3c6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e48038f3c6 | ||
|
|
2390ce1e0c |
1
generic_loader/.gitignore → .gitignore
vendored
1
generic_loader/.gitignore → .gitignore
vendored
@ -3,3 +3,4 @@
|
||||
/.env
|
||||
__pycache__/
|
||||
venv/
|
||||
*/__pycache__/
|
||||
107
generic_loader/samples/sample_kitchensink.expected.json
Normal file
107
generic_loader/samples/sample_kitchensink.expected.json
Normal file
@ -0,0 +1,107 @@
|
||||
{
|
||||
"ALLNULL": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
|
||||
"nullable": true
|
||||
},
|
||||
"ALLNULLC": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "entirely null character",
|
||||
"nullable": true
|
||||
},
|
||||
"BIGINT": {
|
||||
"note": "values beyond int32 range",
|
||||
"nullable": true,
|
||||
"postgres_type": "BIGINT"
|
||||
},
|
||||
"BOOLCOL": {
|
||||
"acceptable_types": [
|
||||
"BOOLEAN",
|
||||
"SMALLINT",
|
||||
"INTEGER"
|
||||
],
|
||||
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
|
||||
"nullable": true
|
||||
},
|
||||
"CONST": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"nullable": false
|
||||
},
|
||||
"DATEASTR": {
|
||||
"note": "stored as char in SAS; loader should coerce ISO-date strings",
|
||||
"nullable": true,
|
||||
"postgres_type": "DATE"
|
||||
},
|
||||
"DATECOL": {
|
||||
"note": "positive control",
|
||||
"nullable": false,
|
||||
"postgres_type": "DATE"
|
||||
},
|
||||
"DTCOL": {
|
||||
"acceptable_types": [
|
||||
"TIMESTAMP",
|
||||
"TIMESTAMP WITHOUT TIME ZONE"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"FLOATCOL": {
|
||||
"acceptable_types": [
|
||||
"DOUBLE PRECISION",
|
||||
"NUMERIC"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"ID": {
|
||||
"nullable": false,
|
||||
"postgres_type": "INTEGER"
|
||||
},
|
||||
"INTCOL": {
|
||||
"note": "positive control",
|
||||
"nullable": false,
|
||||
"postgres_type": "INTEGER"
|
||||
},
|
||||
"LONGSTR": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"MIXED": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "heterogeneous content; loader should fall back to text",
|
||||
"nullable": true
|
||||
},
|
||||
"NUMASSTR": {
|
||||
"acceptable_types": [
|
||||
"NUMERIC",
|
||||
"DOUBLE PRECISION"
|
||||
],
|
||||
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
|
||||
"nullable": true
|
||||
},
|
||||
"STRCOL": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "positive control",
|
||||
"nullable": false
|
||||
},
|
||||
"TIMECOL": {
|
||||
"nullable": true,
|
||||
"postgres_type": "TIME"
|
||||
}
|
||||
}
|
||||
BIN
generic_loader/samples/sample_kitchensink.xpt
Normal file
BIN
generic_loader/samples/sample_kitchensink.xpt
Normal file
Binary file not shown.
352
utils/data_explorer.py
Normal file
352
utils/data_explorer.py
Normal file
@ -0,0 +1,352 @@
|
||||
"""Explore S3 directories and categorise them by accessibility.
|
||||
|
||||
Reads a text file containing one S3 prefix per line (paths within the bucket
|
||||
configured by the ``S3_BUCKET`` constant), then for each prefix:
|
||||
- Lists all objects recursively (via ``list_objects_v2`` paginator)
|
||||
- **Only considers files matching the ``FILE_EXTENSION`` filter** (default
|
||||
``.sas7bdat``). All other file types are ignored.
|
||||
- Tests read permission with ``head_object`` on the first matching file found
|
||||
- Categorises the directory as **Available**, **Blocked**, or **Empty**
|
||||
|
||||
A directory is considered *empty* if it contains no files matching the
|
||||
extension filter, even when other file types are present.
|
||||
|
||||
Configure the constants below, then run::
|
||||
|
||||
python3 data_explorer.py
|
||||
|
||||
Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Tuple
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dependency check
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
try:
|
||||
import boto3 # noqa: F401
|
||||
import botocore.exceptions # noqa: F401
|
||||
except ImportError:
|
||||
print(
|
||||
"ERROR: boto3 / botocore is not installed.\n"
|
||||
"Install with: pip install boto3",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration — edit these before running
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
FILE_EXTENSION: str = ".sas7bdat"
|
||||
"""Only files whose key ends with this extension (case-insensitive) are considered."""
|
||||
|
||||
INPUT_FILE: str = "s3_directories.txt"
|
||||
"""Path to the text file containing one S3 prefix per line."""
|
||||
|
||||
S3_BUCKET: str = "my-bucket"
|
||||
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
|
||||
|
||||
AWS_PROFILE: str = "default"
|
||||
"""AWS CLI profile name used for authentication."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data structures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class AvailableDir:
|
||||
"""An S3 directory that is readable."""
|
||||
|
||||
prefix: str
|
||||
file_count: int
|
||||
total_size: int # bytes
|
||||
|
||||
|
||||
@dataclass
|
||||
class BlockedDir:
|
||||
"""An S3 directory where access was denied or an error occurred."""
|
||||
|
||||
prefix: str
|
||||
file_count: int
|
||||
error: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmptyDir:
|
||||
"""An S3 directory with no objects."""
|
||||
|
||||
prefix: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Results:
|
||||
"""Aggregated exploration results."""
|
||||
|
||||
available: List[AvailableDir] = field(default_factory=list)
|
||||
blocked: List[BlockedDir] = field(default_factory=list)
|
||||
empty: List[EmptyDir] = field(default_factory=list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def read_input_file(path: str) -> List[str]:
|
||||
"""Return a list of S3 prefixes from *path*, ignoring blanks and comments.
|
||||
|
||||
Each line is stripped and normalised so that non-empty prefixes always end
|
||||
with a trailing ``/``.
|
||||
"""
|
||||
prefixes: List[str] = []
|
||||
with open(path, encoding="utf-8") as fh:
|
||||
for raw_line in fh:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
# Normalise: strip surrounding whitespace/slashes, then re-add
|
||||
# a single trailing slash (unless the prefix is empty/root).
|
||||
line = line.strip("/")
|
||||
if line:
|
||||
line += "/"
|
||||
prefixes.append(line)
|
||||
return prefixes
|
||||
|
||||
|
||||
def format_size(size_bytes: int) -> str:
|
||||
"""Return a human-readable size string (KB, MB, GB, TB)."""
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes} B"
|
||||
for unit in ("KB", "MB", "GB", "TB"):
|
||||
size_bytes /= 1024.0
|
||||
if size_bytes < 1024.0 or unit == "TB":
|
||||
return f"{size_bytes:,.1f} {unit}"
|
||||
# Fallback (should not be reached)
|
||||
return f"{size_bytes:,.1f} TB"
|
||||
|
||||
|
||||
def list_objects(
|
||||
s3_client: "botocore.client.S3",
|
||||
bucket: str,
|
||||
prefix: str,
|
||||
) -> Tuple[str | None, int, int]:
|
||||
"""Recursively list all objects under *prefix* using streaming counters.
|
||||
|
||||
Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are
|
||||
counted. All other files are silently skipped.
|
||||
|
||||
Returns ``(first_key, file_count, total_size)`` where *first_key* is the
|
||||
key of the first matching object found (or ``None`` if no matching files
|
||||
exist), *file_count* is the total number of matching objects, and
|
||||
*total_size* is the sum of their sizes in bytes.
|
||||
|
||||
Unlike the previous implementation this never accumulates all keys in
|
||||
memory, making it safe for prefixes with millions of objects.
|
||||
"""
|
||||
ext_lower = FILE_EXTENSION.lower()
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
first_key: str | None = None
|
||||
file_count: int = 0
|
||||
total_size: int = 0
|
||||
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
if not obj["Key"].lower().endswith(ext_lower):
|
||||
continue
|
||||
if first_key is None:
|
||||
first_key = obj["Key"]
|
||||
file_count += 1
|
||||
total_size += obj["Size"]
|
||||
return first_key, file_count, total_size
|
||||
|
||||
|
||||
def check_read_permission(
|
||||
s3_client: "botocore.client.S3",
|
||||
bucket: str,
|
||||
key: str,
|
||||
) -> str | None:
|
||||
"""Try ``head_object`` on *key*. Return ``None`` on success or an error string."""
|
||||
try:
|
||||
s3_client.head_object(Bucket=bucket, Key=key)
|
||||
except botocore.exceptions.ClientError as exc:
|
||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
||||
return f"{message} ({code})"
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def explore_directories(prefixes: List[str]) -> Results:
|
||||
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*."""
|
||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
||||
s3 = session.client("s3")
|
||||
|
||||
results = Results()
|
||||
total = len(prefixes)
|
||||
|
||||
for idx, prefix in enumerate(prefixes, start=1):
|
||||
print(
|
||||
f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# --- Recursive listing ------------------------------------------------
|
||||
try:
|
||||
first_key, file_count, total_size = list_objects(s3, S3_BUCKET, prefix)
|
||||
except botocore.exceptions.ClientError as exc:
|
||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})")
|
||||
)
|
||||
continue
|
||||
except Exception as exc:
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=0, error=str(exc))
|
||||
)
|
||||
continue
|
||||
|
||||
if first_key is None:
|
||||
results.empty.append(EmptyDir(prefix=prefix))
|
||||
continue
|
||||
|
||||
# --- Permission check -------------------------------------------------
|
||||
# Prefer a real object over a zero-byte directory marker (key ending
|
||||
# in "/") for the head_object test. The selected key must also match
|
||||
# the FILE_EXTENSION filter. If no suitable key is found, fall back
|
||||
# to first_key.
|
||||
ext_lower = FILE_EXTENSION.lower()
|
||||
test_key = first_key
|
||||
if first_key.endswith("/") and total_size > 0:
|
||||
# Re-scan the first page to find a non-marker key matching the extension
|
||||
try:
|
||||
probe_paginator = s3.get_paginator("list_objects_v2")
|
||||
for probe_page in probe_paginator.paginate(
|
||||
Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000}
|
||||
):
|
||||
for obj in probe_page.get("Contents", []):
|
||||
if (
|
||||
not (obj["Key"].endswith("/") and obj["Size"] == 0)
|
||||
and obj["Key"].lower().endswith(ext_lower)
|
||||
):
|
||||
test_key = obj["Key"]
|
||||
break
|
||||
if test_key != first_key:
|
||||
break
|
||||
except Exception:
|
||||
pass # Fall back to first_key
|
||||
|
||||
error = check_read_permission(s3, S3_BUCKET, test_key)
|
||||
if error is None:
|
||||
results.available.append(
|
||||
AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size)
|
||||
)
|
||||
else:
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=file_count, error=error)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_results(results: Results) -> None:
|
||||
"""Print a clean, human-readable summary to stdout."""
|
||||
print()
|
||||
print("=== S3 Directory Explorer Results ===")
|
||||
print(f"Bucket: {S3_BUCKET}")
|
||||
|
||||
# --- Available ---
|
||||
print()
|
||||
print(f"--- Available ({len(results.available)}) ---")
|
||||
if results.available:
|
||||
for d in results.available:
|
||||
print(f" {d.prefix}")
|
||||
print(f" {FILE_EXTENSION} files: {d.file_count} | Total Size: {format_size(d.total_size)}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Blocked ---
|
||||
print()
|
||||
print(f"--- Blocked ({len(results.blocked)}) ---")
|
||||
if results.blocked:
|
||||
for d in results.blocked:
|
||||
if d.file_count:
|
||||
print(f" {d.prefix}")
|
||||
print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}")
|
||||
else:
|
||||
print(f" {d.prefix}")
|
||||
print(f" Error: {d.error}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Empty ---
|
||||
print()
|
||||
print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---")
|
||||
if results.empty:
|
||||
for d in results.empty:
|
||||
print(f" {d.prefix}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
# --- Read input file ------------------------------------------------------
|
||||
if not os.path.exists(INPUT_FILE):
|
||||
print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
prefixes = read_input_file(INPUT_FILE)
|
||||
except Exception as exc:
|
||||
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not prefixes:
|
||||
print("No valid S3 prefixes found in the input file. Nothing to do.")
|
||||
sys.exit(0)
|
||||
|
||||
# --- Validate AWS profile -------------------------------------------------
|
||||
try:
|
||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
||||
# Force credential resolution to catch bad profiles early
|
||||
credentials = session.get_credentials()
|
||||
if credentials is None:
|
||||
raise RuntimeError(
|
||||
f"No credentials found for AWS profile {AWS_PROFILE!r}"
|
||||
)
|
||||
except botocore.exceptions.ProfileNotFound as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except Exception as exc:
|
||||
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# --- Explore --------------------------------------------------------------
|
||||
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
|
||||
results = explore_directories(prefixes)
|
||||
print_results(results)
|
||||
8
utils/sample_s3_directories.txt
Normal file
8
utils/sample_s3_directories.txt
Normal file
@ -0,0 +1,8 @@
|
||||
# S3 Directory Explorer - Input File
|
||||
# One S3 prefix per line (within the bucket configured in data_explorer.py).
|
||||
# Blank lines and comments (#) are ignored.
|
||||
#
|
||||
# Examples:
|
||||
# data/sales/
|
||||
# data/inventory/
|
||||
# data/archive/
|
||||
Loading…
Reference in New Issue
Block a user