diff --git a/generic_loader/.gitignore b/.gitignore similarity index 74% rename from generic_loader/.gitignore rename to .gitignore index 055f10e..64af339 100644 --- a/generic_loader/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /.env __pycache__/ venv/ +*/__pycache__/ \ No newline at end of file diff --git a/generic_loader/samples/sample_kitchensink.expected.json b/generic_loader/samples/sample_kitchensink.expected.json new file mode 100644 index 0000000..081ecab --- /dev/null +++ b/generic_loader/samples/sample_kitchensink.expected.json @@ -0,0 +1,107 @@ +{ + "ALLNULL": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "note": "entirely null numeric; loader must pick a default type, typically TEXT", + "nullable": true + }, + "ALLNULLC": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "note": "entirely null character", + "nullable": true + }, + "BIGINT": { + "note": "values beyond int32 range", + "nullable": true, + "postgres_type": "BIGINT" + }, + "BOOLCOL": { + "acceptable_types": [ + "BOOLEAN", + "SMALLINT", + "INTEGER" + ], + "note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision", + "nullable": true + }, + "CONST": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "nullable": false + }, + "DATEASTR": { + "note": "stored as char in SAS; loader should coerce ISO-date strings", + "nullable": true, + "postgres_type": "DATE" + }, + "DATECOL": { + "note": "positive control", + "nullable": false, + "postgres_type": "DATE" + }, + "DTCOL": { + "acceptable_types": [ + "TIMESTAMP", + "TIMESTAMP WITHOUT TIME ZONE" + ], + "nullable": true + }, + "FLOATCOL": { + "acceptable_types": [ + "DOUBLE PRECISION", + "NUMERIC" + ], + "nullable": true + }, + "ID": { + "nullable": false, + "postgres_type": "INTEGER" + }, + "INTCOL": { + "note": "positive control", + "nullable": false, + "postgres_type": "INTEGER" + }, + "LONGSTR": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "nullable": true + }, + "MIXED": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "note": "heterogeneous content; loader should fall back to text", + "nullable": true + }, + "NUMASSTR": { + "acceptable_types": [ + "NUMERIC", + "DOUBLE PRECISION" + ], + "note": "stored as char in SAS; loader should coerce numeric-looking strings", + "nullable": true + }, + "STRCOL": { + "acceptable_types": [ + "TEXT", + "VARCHAR" + ], + "note": "positive control", + "nullable": false + }, + "TIMECOL": { + "nullable": true, + "postgres_type": "TIME" + } +} diff --git a/generic_loader/samples/sample_kitchensink.xpt b/generic_loader/samples/sample_kitchensink.xpt new file mode 100644 index 0000000..67b95d1 Binary files /dev/null and b/generic_loader/samples/sample_kitchensink.xpt differ diff --git a/utils/data_explorer.py b/utils/data_explorer.py new file mode 100644 index 0000000..6f3cc3b --- /dev/null +++ b/utils/data_explorer.py @@ -0,0 +1,330 @@ +"""Explore S3 directories and categorise them by accessibility. + +Reads a text file containing one S3 prefix per line (paths within the bucket +configured by the ``S3_BUCKET`` constant), then for each prefix: +- Lists all objects recursively (via ``list_objects_v2`` paginator) +- Tests read permission with ``head_object`` on the first file found +- Categorises the directory as **Available**, **Blocked**, or **Empty** + +Configure the three constants below, then run:: + + python3 data_explorer.py + +Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib. +""" + +from __future__ import annotations + +import sys +from dataclasses import dataclass, field +from typing import List, Tuple + +# --------------------------------------------------------------------------- +# Dependency check +# --------------------------------------------------------------------------- + +try: + import boto3 # noqa: F401 + import botocore.exceptions # noqa: F401 +except ImportError: + print( + "ERROR: boto3 / botocore is not installed.\n" + "Install with: pip install boto3", + file=sys.stderr, + ) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Configuration — edit these before running +# --------------------------------------------------------------------------- + +INPUT_FILE: str = "s3_directories.txt" +"""Path to the text file containing one S3 prefix per line.""" + +S3_BUCKET: str = "my-bucket" +"""S3 bucket name (all prefixes are assumed to live in this bucket).""" + +AWS_PROFILE: str = "default" +"""AWS CLI profile name used for authentication.""" + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + + +@dataclass +class AvailableDir: + """An S3 directory that is readable.""" + + prefix: str + file_count: int + total_size: int # bytes + + +@dataclass +class BlockedDir: + """An S3 directory where access was denied or an error occurred.""" + + prefix: str + file_count: int + error: str + + +@dataclass +class EmptyDir: + """An S3 directory with no objects.""" + + prefix: str + + +@dataclass +class Results: + """Aggregated exploration results.""" + + available: List[AvailableDir] = field(default_factory=list) + blocked: List[BlockedDir] = field(default_factory=list) + empty: List[EmptyDir] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def read_input_file(path: str) -> List[str]: + """Return a list of S3 prefixes from *path*, ignoring blanks and comments. + + Each line is stripped and normalised so that non-empty prefixes always end + with a trailing ``/``. + """ + prefixes: List[str] = [] + with open(path, encoding="utf-8") as fh: + for raw_line in fh: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + # Normalise: strip surrounding whitespace/slashes, then re-add + # a single trailing slash (unless the prefix is empty/root). + line = line.strip("/") + if line: + line += "/" + prefixes.append(line) + return prefixes + + +def format_size(size_bytes: int) -> str: + """Return a human-readable size string (KB, MB, GB, TB).""" + if size_bytes < 1024: + return f"{size_bytes} B" + for unit in ("KB", "MB", "GB", "TB"): + size_bytes /= 1024.0 + if size_bytes < 1024.0 or unit == "TB": + return f"{size_bytes:,.1f} {unit}" + # Fallback (should not be reached) + return f"{size_bytes:,.1f} TB" + + +def list_objects( + s3_client: "botocore.client.S3", + bucket: str, + prefix: str, +) -> Tuple[str | None, int, int]: + """Recursively list all objects under *prefix* using streaming counters. + + Returns ``(first_key, file_count, total_size)`` where *first_key* is the + key of the first object found (or ``None`` if the prefix is empty), + *file_count* is the total number of objects, and *total_size* is the sum + of all object sizes in bytes. + + Unlike the previous implementation this never accumulates all keys in + memory, making it safe for prefixes with millions of objects. + """ + paginator = s3_client.get_paginator("list_objects_v2") + first_key: str | None = None + file_count: int = 0 + total_size: int = 0 + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if first_key is None: + first_key = obj["Key"] + file_count += 1 + total_size += obj["Size"] + return first_key, file_count, total_size + + +def check_read_permission( + s3_client: "botocore.client.S3", + bucket: str, + key: str, +) -> str | None: + """Try ``head_object`` on *key*. Return ``None`` on success or an error string.""" + try: + s3_client.head_object(Bucket=bucket, Key=key) + except botocore.exceptions.ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "Unknown") + message = exc.response.get("Error", {}).get("Message", str(exc)) + return f"{message} ({code})" + return None + + +# --------------------------------------------------------------------------- +# Core logic +# --------------------------------------------------------------------------- + + +def explore_directories(prefixes: List[str]) -> Results: + """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.""" + session = boto3.Session(profile_name=AWS_PROFILE) + s3 = session.client("s3") + + results = Results() + total = len(prefixes) + + for idx, prefix in enumerate(prefixes, start=1): + print(f"[{idx}/{total}] Checking {prefix} ...", file=sys.stderr) + + # --- Recursive listing ------------------------------------------------ + try: + first_key, file_count, total_size = list_objects(s3, S3_BUCKET, prefix) + except botocore.exceptions.ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "Unknown") + message = exc.response.get("Error", {}).get("Message", str(exc)) + results.blocked.append( + BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})") + ) + continue + except Exception as exc: + results.blocked.append( + BlockedDir(prefix=prefix, file_count=0, error=str(exc)) + ) + continue + + if first_key is None: + results.empty.append(EmptyDir(prefix=prefix)) + continue + + # --- Permission check ------------------------------------------------- + # Prefer a real object over a zero-byte directory marker (key ending + # in "/") for the head_object test. If every key is a directory + # marker, fall back to the first one anyway. + test_key = first_key + if first_key.endswith("/") and total_size > 0: + # Re-scan the first page to find a non-marker key + try: + probe_paginator = s3.get_paginator("list_objects_v2") + for probe_page in probe_paginator.paginate( + Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000} + ): + for obj in probe_page.get("Contents", []): + if not (obj["Key"].endswith("/") and obj["Size"] == 0): + test_key = obj["Key"] + break + if test_key != first_key: + break + except Exception: + pass # Fall back to first_key + + error = check_read_permission(s3, S3_BUCKET, test_key) + if error is None: + results.available.append( + AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size) + ) + else: + results.blocked.append( + BlockedDir(prefix=prefix, file_count=file_count, error=error) + ) + + return results + + +# --------------------------------------------------------------------------- +# Output +# --------------------------------------------------------------------------- + + +def print_results(results: Results) -> None: + """Print a clean, human-readable summary to stdout.""" + print() + print("=== S3 Directory Explorer Results ===") + print(f"Bucket: {S3_BUCKET}") + + # --- Available --- + print() + print(f"--- Available ({len(results.available)}) ---") + if results.available: + for d in results.available: + print(f" {d.prefix}") + print(f" Files: {d.file_count} | Total Size: {format_size(d.total_size)}") + else: + print(" (none)") + + # --- Blocked --- + print() + print(f"--- Blocked ({len(results.blocked)}) ---") + if results.blocked: + for d in results.blocked: + if d.file_count: + print(f" {d.prefix}") + print(f" Files found: {d.file_count} | Error: {d.error}") + else: + print(f" {d.prefix}") + print(f" Error: {d.error}") + else: + print(" (none)") + + # --- Empty --- + print() + print(f"--- Empty ({len(results.empty)}) ---") + if results.empty: + for d in results.empty: + print(f" {d.prefix}") + else: + print(" (none)") + + print() + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import os + + # --- Read input file ------------------------------------------------------ + if not os.path.exists(INPUT_FILE): + print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr) + sys.exit(1) + + try: + prefixes = read_input_file(INPUT_FILE) + except Exception as exc: + print(f"ERROR: Could not read input file: {exc}", file=sys.stderr) + sys.exit(1) + + if not prefixes: + print("No valid S3 prefixes found in the input file. Nothing to do.") + sys.exit(0) + + # --- Validate AWS profile ------------------------------------------------- + try: + session = boto3.Session(profile_name=AWS_PROFILE) + # Force credential resolution to catch bad profiles early + credentials = session.get_credentials() + if credentials is None: + raise RuntimeError( + f"No credentials found for AWS profile {AWS_PROFILE!r}" + ) + except botocore.exceptions.ProfileNotFound as exc: + print(f"ERROR: {exc}", file=sys.stderr) + sys.exit(1) + except Exception as exc: + print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr) + sys.exit(1) + + # --- Explore -------------------------------------------------------------- + print(f"Bucket: {S3_BUCKET}", file=sys.stderr) + results = explore_directories(prefixes) + print_results(results) diff --git a/utils/sample_s3_directories.txt b/utils/sample_s3_directories.txt new file mode 100644 index 0000000..f38ba87 --- /dev/null +++ b/utils/sample_s3_directories.txt @@ -0,0 +1,8 @@ +# S3 Directory Explorer - Input File +# One S3 prefix per line (within the bucket configured in data_explorer.py). +# Blank lines and comments (#) are ignored. +# +# Examples: +# data/sales/ +# data/inventory/ +# data/archive/