Compare commits

..

No commits in common. "main" and "file_viewer" have entirely different histories.

15 changed files with 220 additions and 7157 deletions

View File

@ -3,6 +3,3 @@ PGPORT=5432
PGUSER= PGUSER=
PGPASSWORD= PGPASSWORD=
PGDATABASE= PGDATABASE=
S3_BUCKET=my-bucket
AWS_PROFILE=default

View File

@ -1,7 +1,5 @@
/.venv /.venv
/samples /samples
.env /.env
!.env.example
__pycache__/ __pycache__/
venv/ venv/
*/__pycache__/

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -15,64 +15,3 @@ tablename: kitchensink
# What to do if the target table already exists: fail | replace | append # What to do if the target table already exists: fail | replace | append
# Defaults to fail. # Defaults to fail.
if_exists: append if_exists: append
# file_type: Type of data file to load. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# partition_by: Partition the table by unique values of these columns.
# Columns are applied in cascading order (first column = top-level partition).
# Requires if_exists: replace or fail (not append for initial creation).
# Single field:
# partition_by: state
# Multiple fields (cascading):
# partition_by:
# - state
# - zip
#
# max_partitions: Warning threshold for total partition count (default: 10000).
# If the number of partitions exceeds this, a warning is logged but loading continues.
# max_partitions: 10000
# indexes: Create B-tree indexes on these columns after data loading.
# Indexes are created with IF NOT EXISTS for safe use with append mode.
# Single column:
# indexes: state
# Multiple columns (one index per column):
# indexes:
# - state
# - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -19,33 +19,8 @@ if_exists: replace
# auto-grouped with its peers by stripping trailing digits (and any trailing # auto-grouped with its peers by stripping trailing digits (and any trailing
# _ / -) from the file stem. Files with no trailing digits become their own # _ / -) from the file stem. Files with no trailing digits become their own
# singleton cluster. # singleton cluster.
#
# Auto-detect only recognizes *trailing* digit runs. If your file names put
# the varying number in the middle of the stem (e.g. surrounded by year,
# region, and detail components), auto-detect will NOT group them - each
# file becomes its own singleton cluster. Use an explicit pattern instead;
# see the embedded-digit example near the bottom of this file.
auto_detect: true auto_detect: true
# file_type: Type of data files in this folder. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files
# instead of .sas7bdat/.xpt/.xport files.
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# Folder-level column filter. Every file in every cluster passes through # Folder-level column filter. Every file in every cluster passes through
# this filter. `include` and `exclude` are mutually exclusive. A cluster can # this filter. `include` and `exclude` are mutually exclusive. A cluster can
# override these via its own `include` / `exclude` keys. # override these via its own `include` / `exclude` keys.
@ -56,76 +31,15 @@ auto_detect: true
# exclude: # exclude:
# - ALLNULL # - ALLNULL
# Folder-level partition_by: Partition every cluster's table by unique values
# of these columns. Inherited by all clusters unless overridden per-cluster.
# Requires if_exists: replace or fail (not append for initial creation).
# Single field:
# partition_by: state
# Multiple fields (cascading):
# partition_by:
# - state
# - zip
#
# Folder-level max_partitions: Warning threshold for total partition count
# (default: 10000). Inherited by all clusters unless overridden per-cluster.
# max_partitions: 10000
# Folder-level indexes: Create B-tree indexes on these columns after data
# loading. Inherited by all clusters unless overridden per-cluster.
# Indexes are created with IF NOT EXISTS for safe use with append mode.
# Single column:
# indexes: state
# Multiple columns (one index per column):
# indexes:
# - state
# - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file # Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect # *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly. # pool, so explicit and auto clusters compose cleanly.
# #
# `tablename` is required. `if_exists`, `include`, `exclude`, and # `tablename` is required. `if_exists`, `include`, and `exclude` are
# `column_types` are optional per-cluster overrides of the folder-level # optional per-cluster overrides of the folder-level defaults above.
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
clusters: clusters:
- pattern: '^group_a\d+\.xpt$' - pattern: '^group_a\d+\.xpt$'
tablename: group_a tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to # Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace": # append instead of replace even though the folder default is "replace":
@ -134,44 +48,7 @@ clusters:
# tablename: group_b # tablename: group_b
# if_exists: append # if_exists: append
# Per-cluster partition_by / max_partitions override. These take precedence # With only the gq pattern explicit, auto_detect: true will still bucket
# over the folder-level defaults above. # group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
#
# - pattern: '^group_c\d+\.xpt$'
# tablename: group_c
# partition_by:
# - region
# - year
# max_partitions: 500
# Per-cluster indexes override. Takes precedence over the folder-level
# indexes default above. An explicit empty list disables indexing for
# this cluster even when the folder default has indexes.
#
# - pattern: '^group_d\d+\.xpt$'
# tablename: group_d
# indexes:
# - region
# - year
# Embedded-digit example. When the varying number sits in the MIDDLE of
# the stem (e.g. year2020_regionA_40_detail.sas7bdat,
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
# them - each file becomes its own singleton cluster. An explicit
# pattern bucketizes them correctly. The \d+ matches any width, and
# files within the cluster are sorted numerically by the last digit
# group in the stem, so _9_ sorts before _40_ regardless of zero-
# padding. Gaps in the numeric sequence (missing 3, 7, 14, ...) are
# fine - whatever files are present get loaded in numeric order.
#
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# tablename: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# tablename: data_group_a
# With only the group_a pattern explicit, auto_detect: true will still
# bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py # standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
# for the fixture that exercises exactly this layout. # for the fixture that exercises exactly this layout.

View File

@ -1,107 +0,0 @@
{
"ALLNULL": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
"nullable": true
},
"ALLNULLC": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "entirely null character",
"nullable": true
},
"BIGINT": {
"note": "values beyond int32 range",
"nullable": true,
"postgres_type": "BIGINT"
},
"BOOLCOL": {
"acceptable_types": [
"BOOLEAN",
"SMALLINT",
"INTEGER"
],
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
"nullable": true
},
"CONST": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"nullable": false
},
"DATEASTR": {
"note": "stored as char in SAS; loader should coerce ISO-date strings",
"nullable": true,
"postgres_type": "DATE"
},
"DATECOL": {
"note": "positive control",
"nullable": false,
"postgres_type": "DATE"
},
"DTCOL": {
"acceptable_types": [
"TIMESTAMP",
"TIMESTAMP WITHOUT TIME ZONE"
],
"nullable": true
},
"FLOATCOL": {
"acceptable_types": [
"DOUBLE PRECISION",
"NUMERIC"
],
"nullable": true
},
"ID": {
"nullable": false,
"postgres_type": "INTEGER"
},
"INTCOL": {
"note": "positive control",
"nullable": false,
"postgres_type": "INTEGER"
},
"LONGSTR": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"nullable": true
},
"MIXED": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "heterogeneous content; loader should fall back to text",
"nullable": true
},
"NUMASSTR": {
"acceptable_types": [
"NUMERIC",
"DOUBLE PRECISION"
],
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
"nullable": true
},
"STRCOL": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "positive control",
"nullable": false
},
"TIMECOL": {
"nullable": true,
"postgres_type": "TIME"
}
}

View File

@ -1,10 +1,7 @@
pandas>=2.0,<3.0 pandas>=2.0,<3.0
pyreadstat>=1.2,<2.0 pyreadstat>=1.2,<1.3
numpy>=2.1,<3.0 numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0 pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0 psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0 python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0 boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

View File

@ -1,679 +0,0 @@
"""Explore S3 directories and categorise them by accessibility.
Reads a text file containing one S3 prefix per line (paths within the bucket
configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then
for each prefix:
- Lists all objects recursively (via ``list_objects_v2`` paginator)
- **Only considers files matching the configured extensions** (default: all
supported extensions SAS and text). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found
- If the first file is accessible, tests ALL remaining files individually
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
tracks individual file **Exceptions** within available directories
Supported file types
--------------------
* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport``
* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv``
A directory is considered *empty* if it contains no files matching the
extension filter, even when other file types are present.
Configure the constants below (or use CLI arguments), then run::
python3 data_explorer.py [OPTIONS]
Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib.
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass, field
from typing import List, Set, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
# ---------------------------------------------------------------------------
# Dependency check
# ---------------------------------------------------------------------------
try:
import boto3 # noqa: F401
import botocore.exceptions # noqa: F401
except ImportError:
print(
"ERROR: boto3 / botocore is not installed.\n"
"Install with: pip install boto3",
file=sys.stderr,
)
sys.exit(1)
# ---------------------------------------------------------------------------
# Extension constants
# ---------------------------------------------------------------------------
SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"}
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text / CSV files."""
SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all file extensions this tool can work with."""
# ---------------------------------------------------------------------------
# Configuration defaults — edit these or override via CLI arguments
# ---------------------------------------------------------------------------
FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
"""Set of extensions to filter on (case-insensitive). Defaults to all supported."""
INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line."""
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
"""AWS CLI profile name used for authentication."""
# Text-file reading defaults (used when downloading / previewing text files)
DEFAULT_DELIMITER: str = ","
DEFAULT_ENCODING: str = "utf-8"
DEFAULT_QUOTECHAR: str = '"'
# ---------------------------------------------------------------------------
# Auto-detection helpers
# ---------------------------------------------------------------------------
def detect_file_type(filename: str) -> str:
"""Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension.
The check is case-insensitive. For ``.tsv`` files the caller should
default the delimiter to a tab character (``'\\t'``).
Examples
--------
>>> detect_file_type("data.sas7bdat")
'sas'
>>> detect_file_type("report.CSV")
'text'
>>> detect_file_type("archive.zip")
'unknown'
"""
ext = os.path.splitext(filename)[1].lower()
if ext in SAS_EXTENSIONS:
return "sas"
if ext in TEXT_EXTENSIONS:
return "text"
return "unknown"
def default_delimiter_for(filename: str) -> str:
"""Return a sensible default delimiter for *filename*.
* ``.tsv`` ``'\\t'``
* everything else ``','``
"""
ext = os.path.splitext(filename)[1].lower()
if ext == ".tsv":
return "\t"
return ","
def matches_extensions(key: str, extensions: Set[str]) -> bool:
"""Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive)."""
key_lower = key.lower()
return any(key_lower.endswith(ext) for ext in extensions)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class AvailableDir:
"""An S3 directory that is readable."""
prefix: str
file_count: int
total_size: int # bytes
accessible_count: int = 0 # files that passed head_object
total_count: int = 0 # total .sas7bdat files found
accessible_size: int = 0 # total size of accessible files only
@dataclass
class BlockedDir:
"""An S3 directory where access was denied or an error occurred."""
prefix: str
file_count: int
error: str
@dataclass
class EmptyDir:
"""An S3 directory with no objects."""
prefix: str
@dataclass
class ExceptionFile:
"""A specific file that failed permission check within an otherwise available directory."""
prefix: str # the directory prefix
key: str # the full S3 key of the failed file
error: str # the error message
@dataclass
class Results:
"""Aggregated exploration results."""
available: List[AvailableDir] = field(default_factory=list)
blocked: List[BlockedDir] = field(default_factory=list)
empty: List[EmptyDir] = field(default_factory=list)
exceptions: List[ExceptionFile] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def read_input_file(path: str) -> List[str]:
"""Return a list of S3 prefixes from *path*, ignoring blanks and comments.
Each line is stripped and normalised so that non-empty prefixes always end
with a trailing ``/``.
"""
prefixes: List[str] = []
with open(path, encoding="utf-8") as fh:
for raw_line in fh:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
# Normalise: strip surrounding whitespace/slashes, then re-add
# a single trailing slash (unless the prefix is empty/root).
line = line.strip("/")
if line:
line += "/"
prefixes.append(line)
return prefixes
def format_size(size_bytes: int) -> str:
"""Return a human-readable size string (KB, MB, GB, TB)."""
if size_bytes < 1024:
return f"{size_bytes} B"
for unit in ("KB", "MB", "GB", "TB"):
size_bytes /= 1024.0
if size_bytes < 1024.0 or unit == "TB":
return f"{size_bytes:,.1f} {unit}"
# Fallback (should not be reached)
return f"{size_bytes:,.1f} TB"
def extensions_label(extensions: Set[str]) -> str:
"""Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``)."""
return "/".join(sorted(extensions))
def list_objects(
s3_client: "botocore.client.S3",
bucket: str,
prefix: str,
extensions: Set[str] | None = None,
) -> Tuple[List[Tuple[str, int]], int]:
"""Recursively list all objects under *prefix*.
Only objects whose key ends with one of *extensions* (case-insensitive) are
counted. All other files are silently skipped. When *extensions* is
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
Returns ``(files, total_size)`` where *files* is a list of
``(key, size)`` tuples for every matching object and *total_size* is the
sum of their sizes in bytes.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
paginator = s3_client.get_paginator("list_objects_v2")
files: List[Tuple[str, int]] = []
total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
continue
files.append((obj["Key"], obj["Size"]))
total_size += obj["Size"]
return files, total_size
def check_read_permission(
s3_client: "botocore.client.S3",
bucket: str,
key: str,
) -> str | None:
"""Try ``head_object`` on *key*. Return ``None`` on success or an error string."""
try:
s3_client.head_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
return f"{message} ({code})"
return None
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def explore_directories(
prefixes: List[str],
*,
extensions: Set[str] | None = None,
) -> Results:
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.
Parameters
----------
prefixes:
List of S3 key prefixes to explore.
extensions:
Set of file extensions to filter on. Defaults to the module-level
``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``).
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
ext_label = extensions_label(extensions)
session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3")
results = Results()
total = len(prefixes)
for idx, prefix in enumerate(prefixes, start=1):
print(
f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...",
file=sys.stderr,
)
# --- Recursive listing ------------------------------------------------
try:
files, total_size = list_objects(
s3, S3_BUCKET, prefix, extensions=extensions,
)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
results.blocked.append(
BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})")
)
continue
except Exception as exc:
results.blocked.append(
BlockedDir(prefix=prefix, file_count=0, error=str(exc))
)
continue
if not files:
results.empty.append(EmptyDir(prefix=prefix))
continue
file_count = len(files)
# --- Permission check on first file -----------------------------------
# in "/") for the head_object test. The listing is already filtered
# to the requested extensions, so any non-marker key is a valid probe.
first_key, _ = files[0]
test_key = first_key
if first_key.endswith("/") and total_size > 0:
for key, size in files:
if not (key.endswith("/") and size == 0):
test_key = key
break
error = check_read_permission(s3, S3_BUCKET, test_key)
if error is not None:
# First file blocked → entire directory is blocked
results.blocked.append(
BlockedDir(prefix=prefix, file_count=file_count, error=error)
)
continue
# --- First file accessible → check ALL remaining files ----------------
accessible_count = 1 # the first (test_key) already passed
accessible_size = 0
dir_exceptions: List[ExceptionFile] = []
# Find the size of the test_key to count it
for key, size in files:
if key == test_key:
accessible_size = size
break
# Build list of remaining files to check
remaining = [(key, size) for key, size in files if key != test_key]
if remaining:
if len(remaining) > 10:
print(
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
file=sys.stderr,
)
for key, size in remaining:
file_error = check_read_permission(s3, S3_BUCKET, key)
if file_error is None:
accessible_count += 1
accessible_size += size
else:
dir_exceptions.append(
ExceptionFile(prefix=prefix, key=key, error=file_error)
)
else:
# Only one file and it passed
accessible_size = total_size
results.available.append(
AvailableDir(
prefix=prefix,
file_count=file_count,
total_size=total_size,
accessible_count=accessible_count,
total_count=file_count,
accessible_size=accessible_size,
)
)
results.exceptions.extend(dir_exceptions)
return results
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def print_results(results: Results, *, extensions: Set[str] | None = None) -> None:
"""Print a clean, human-readable summary to stdout.
Parameters
----------
results:
The exploration results to display.
extensions:
The set of extensions that were used for filtering. Used only for
labelling in the output. Defaults to ``FILE_EXTENSIONS``.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
ext_label = extensions_label(extensions)
print()
print("=== S3 Directory Explorer Results ===")
print(f"Bucket: {S3_BUCKET}")
print(f"Extensions: {ext_label}")
# --- Available ---
print()
print(f"--- Available ({len(results.available)}) ---")
if results.available:
for d in results.available:
print(f" {d.prefix}")
print(
f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible"
f" | Total Size: {format_size(d.accessible_size)}"
)
else:
print(" (none)")
# --- Blocked ---
print()
print(f"--- Blocked ({len(results.blocked)}) ---")
if results.blocked:
for d in results.blocked:
if d.file_count:
print(f" {d.prefix}")
print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}")
else:
print(f" {d.prefix}")
print(f" Error: {d.error}")
else:
print(" (none)")
# --- Exceptions ---
print()
print(f"--- Exceptions ({len(results.exceptions)}) ---")
if results.exceptions:
for exc in results.exceptions:
print(f" {exc.key}")
print(f" Directory: {exc.prefix} | Error: {exc.error}")
else:
print(" (none)")
# --- Empty ---
print()
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
if results.empty:
for d in results.empty:
print(f" {d.prefix}")
else:
print(" (none)")
print()
# ---------------------------------------------------------------------------
# CLI argument parsing
# ---------------------------------------------------------------------------
def build_arg_parser() -> argparse.ArgumentParser:
"""Build and return the CLI argument parser.
Supports selecting file-type filters, text-file reading parameters, and
overriding the default bucket / profile / input-file settings.
"""
parser = argparse.ArgumentParser(
description=(
"Explore S3 directories and categorise them by accessibility. "
"Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text "
"files (.txt, .csv, .tsv)."
),
)
# --- File-type / extension selection ---
type_group = parser.add_argument_group("File-type selection")
type_group.add_argument(
"--file-type",
choices=["sas", "text", "all"],
default="all",
help=(
"Restrict the scan to a specific file type. "
"'sas' = .sas7bdat/.xpt/.xport only; "
"'text' = .txt/.csv/.tsv only; "
"'all' = both (default)."
),
)
type_group.add_argument(
"--extensions",
nargs="+",
metavar="EXT",
help=(
"Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). "
"Overrides --file-type when provided."
),
)
# --- Text-file reading parameters ---
text_group = parser.add_argument_group(
"Text-file parameters",
description=(
"Parameters used when reading delimited text files. These are "
"stored for downstream consumers and do not affect the S3 scan "
"itself."
),
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
"Field delimiter for text files (default: ',' for .csv/.txt, "
"'\\t' for .tsv). Use 'tab' or '\\t' for a tab character."
),
)
text_group.add_argument(
"--encoding",
default=DEFAULT_ENCODING,
help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).",
)
text_group.add_argument(
"--quotechar",
default=DEFAULT_QUOTECHAR,
help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).",
)
# --- S3 / general settings ---
s3_group = parser.add_argument_group("S3 settings")
s3_group.add_argument(
"--bucket",
default=None,
help=f"S3 bucket name (default: {S3_BUCKET}).",
)
s3_group.add_argument(
"--profile",
default=None,
help=f"AWS CLI profile name (default: {AWS_PROFILE}).",
)
s3_group.add_argument(
"--input-file",
default=None,
help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).",
)
return parser
def resolve_extensions(args: argparse.Namespace) -> Set[str]:
"""Determine the active extension set from parsed CLI *args*.
If ``--extensions`` is provided it takes precedence. Otherwise
``--file-type`` is used to select a predefined set.
"""
if args.extensions:
# Normalise: ensure each extension starts with a dot and is lowercase
exts: Set[str] = set()
for ext in args.extensions:
ext = ext.strip().lower()
if not ext.startswith("."):
ext = "." + ext
exts.add(ext)
return exts
if args.file_type == "sas":
return SAS_EXTENSIONS
if args.file_type == "text":
return TEXT_EXTENSIONS
return SUPPORTED_EXTENSIONS
def resolve_delimiter(args: argparse.Namespace) -> str:
"""Return the effective delimiter from parsed CLI *args*.
Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a
tab character on the command line without shell-escaping issues.
"""
if args.delimiter is None:
return DEFAULT_DELIMITER
raw = args.delimiter
if raw.lower() in ("tab", "\\t"):
return "\t"
return raw
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = build_arg_parser()
args = parser.parse_args()
# --- Apply CLI overrides to module-level config ---------------------------
if args.bucket:
S3_BUCKET = args.bucket
if args.profile:
AWS_PROFILE = args.profile
input_file = args.input_file if args.input_file else INPUT_FILE
active_extensions = resolve_extensions(args)
FILE_EXTENSIONS = active_extensions
delimiter = resolve_delimiter(args)
encoding = args.encoding
quotechar = args.quotechar
# --- Read input file ------------------------------------------------------
if not os.path.exists(input_file):
print(f"ERROR: Input file not found: {input_file}", file=sys.stderr)
sys.exit(1)
try:
prefixes = read_input_file(input_file)
except Exception as exc:
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
sys.exit(1)
if not prefixes:
print("No valid S3 prefixes found in the input file. Nothing to do.")
sys.exit(0)
# --- Validate AWS profile -------------------------------------------------
try:
session = boto3.Session(profile_name=AWS_PROFILE)
# Force credential resolution to catch bad profiles early
credentials = session.get_credentials()
if credentials is None:
raise RuntimeError(
f"No credentials found for AWS profile {AWS_PROFILE!r}"
)
except botocore.exceptions.ProfileNotFound as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
except Exception as exc:
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
sys.exit(1)
# --- Print active configuration -------------------------------------------
ext_label = extensions_label(active_extensions)
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
print(f"Extensions: {ext_label}", file=sys.stderr)
if active_extensions & TEXT_EXTENSIONS:
print(
f"Text opts: delimiter={delimiter!r} encoding={encoding!r} "
f"quotechar={quotechar!r}",
file=sys.stderr,
)
# --- Explore --------------------------------------------------------------
results = explore_directories(prefixes, extensions=active_extensions)
print_results(results, extensions=active_extensions)

View File

@ -1,63 +1,37 @@
"""Standalone utility to download a SAS or delimited text file from S3 and """Standalone utility to download a .sas7bdat file from S3 and print a
print a column-level summary of the first *N* rows. column-level summary of the first 10 rows.
Supported formats Configure the four constants below, then run::
-----------------
* **SAS** ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*)
* **Text** ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*)
Configure the four constants below **or** use the CLI arguments, then run::
python3 file_viewer.py python3 file_viewer.py
python3 file_viewer.py --local path/to/file.csv
python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t'
Python 3.14 compatible. Python 3.14 compatible.
""" """
from __future__ import annotations from __future__ import annotations
import argparse
import os import os
import sys import sys
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3 import boto3
import pandas as pd import pandas as pd
import pyreadstat import pyreadstat
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Supported file extensions # Configuration — edit these before running
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"} S3_BUCKET: str = "my-bucket"
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text files."""
SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all supported file extensions."""
# ---------------------------------------------------------------------------
# Configuration — edit these before running (or use CLI arguments)
# ---------------------------------------------------------------------------
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name.""" """S3 bucket name."""
S3_KEY: str = "path/to/file.sas7bdat" S3_KEY: str = "path/to/file.sas7bdat"
"""Object key (path) within the bucket to a supported data file.""" """Object key (path) within the bucket to the .sas7bdat file."""
LOCAL_FOLDER: str = "./downloads" LOCAL_FOLDER: str = "./downloads"
"""Local directory to download the file into.""" """Local directory to download the file into."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default") AWS_PROFILE: str = "default"
"""AWS CLI profile name used for authentication.""" """AWS CLI profile name used for authentication."""
@ -66,148 +40,21 @@ AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None: def _download_from_s3(bucket: str, key: str, local_path: str) -> None:
"""Download *key* from *bucket* to *local_path*, skipping if already present. """Download *key* from *bucket* to *local_path* using a named session."""
If *local_path* exists and its size matches the S3 object's size, the
download is skipped and a message is printed.
Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`.
"""
session = boto3.Session(profile_name=AWS_PROFILE) session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3") s3 = session.client("s3")
remote_size = s3.head_object(Bucket=bucket, Key=key)["ContentLength"]
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if local_size == remote_size:
print(
f"Local file {local_path} already matches s3://{bucket}/{key} "
f"({local_size} bytes); skipping download."
)
return
print(
f"Local file {local_path} size ({local_size} bytes) differs from "
f"S3 ({remote_size} bytes); re-downloading."
)
print(f"Downloading s3://{bucket}/{key} -> {local_path}") print(f"Downloading s3://{bucket}/{key} -> {local_path}")
s3.download_file(bucket, key, local_path) s3.download_file(bucket, key, local_path)
print("Download complete.") print("Download complete.")
# -- SAS readers -------------------------------------------------------------
def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame: def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame:
"""Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``).""" """Read the first *row_count* rows of a .sas7bdat file."""
ext = os.path.splitext(path)[1].lower() df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
if ext == ".sas7bdat":
df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
elif ext in {".xpt", ".xport"}:
df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count)
else:
raise ValueError(f"Unsupported SAS extension: {ext}")
return df return df
# -- Text readers ------------------------------------------------------------
def _read_text_head(
path: str,
row_count: int = 10,
delimiter: str = ",",
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a delimited text file.
Parameters
----------
path : str
Path to the ``.csv``, ``.tsv``, or ``.txt`` file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str, optional
Column delimiter (default ``","``). For ``.tsv`` files the caller
should pass ``"\\t"``.
encoding : str, optional
File encoding (default ``"utf-8"``).
quotechar : str, optional
Character used to quote fields (default ``'"'``).
"""
return pd.read_csv(
path,
sep=delimiter,
encoding=encoding,
quotechar=quotechar,
nrows=row_count,
)
# -- Unified reader ----------------------------------------------------------
def _read_head(
path: str,
row_count: int = 10,
delimiter: str | None = None,
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a supported data file.
Auto-detects the file type from its extension and delegates to the
appropriate reader. For ``.tsv`` files the delimiter defaults to tab
(``"\\t"``); for other text files it defaults to ``","``.
Parameters
----------
path : str
Path to the data file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str or None, optional
Column delimiter for text files. ``None`` means *auto-detect*
(tab for ``.tsv``, comma otherwise).
encoding : str, optional
Encoding for text files (default ``"utf-8"``).
quotechar : str, optional
Quote character for text files (default ``'"'``).
Returns
-------
pandas.DataFrame
"""
ext = os.path.splitext(path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file extension '{ext}'. "
f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}"
)
if ext in SAS_EXTENSIONS:
return _read_sas_head(path, row_count=row_count)
# --- Text file path ---
if delimiter is None:
delimiter = "\t" if ext == ".tsv" else ","
return _read_text_head(
path,
row_count=row_count,
delimiter=delimiter,
encoding=encoding,
quotechar=quotechar,
)
# -- Display -----------------------------------------------------------------
def _sample_values(series: pd.Series, n: int = 3) -> str: def _sample_values(series: pd.Series, n: int = 3) -> str:
"""Return up to *n* non-null sample values as a comma-separated string.""" """Return up to *n* non-null sample values as a comma-separated string."""
non_null = series.dropna() non_null = series.dropna()
@ -247,126 +94,26 @@ def _print_summary(df: pd.DataFrame) -> None:
print() print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for the file-viewer CLI."""
parser = argparse.ArgumentParser(
description=(
"Download a SAS or delimited text file from S3 (or read a local "
"file) and print a column-level summary of the first N rows.\n\n"
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
source = parser.add_mutually_exclusive_group()
source.add_argument(
"--local",
metavar="FILE",
default=None,
help=(
"Path to a local data file to summarise (skips S3 download). "
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
)
source.add_argument(
"--s3-key",
metavar="KEY",
default=None,
help="Override the S3_KEY constant with this object key.",
)
parser.add_argument(
"--rows",
type=int,
default=10,
metavar="N",
help="Number of rows to read (default: 10).",
)
# Text-file-specific options
text_group = parser.add_argument_group(
"text file options",
"These options apply only to .csv / .tsv / .txt files.",
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
'Column delimiter for text files (default: "," for .csv/.txt, '
'"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.'
),
)
text_group.add_argument(
"--encoding",
default="utf-8",
help='File encoding for text files (default: "utf-8").',
)
text_group.add_argument(
"--quotechar",
default='"',
help='Quote character for text files (default: \'"\').',
)
return parser
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main # Main
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
if __name__ == "__main__": if __name__ == "__main__":
parser = _build_parser() # --- Download -----------------------------------------------------------
args = parser.parse_args() os.makedirs(LOCAL_FOLDER, exist_ok=True)
local_filename = os.path.basename(S3_KEY)
local_path = os.path.join(LOCAL_FOLDER, local_filename)
if args.local:
# ---- Local file mode -----------------------------------------------
local_path = args.local
ext = os.path.splitext(local_path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}'. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
if not os.path.isfile(local_path):
print(f"File not found: {local_path}", file=sys.stderr)
sys.exit(1)
else:
# ---- S3 download mode ----------------------------------------------
s3_key = args.s3_key or S3_KEY
ext = os.path.splitext(s3_key)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}' in S3 key. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
os.makedirs(LOCAL_FOLDER, exist_ok=True)
local_filename = os.path.basename(s3_key)
local_path = os.path.join(LOCAL_FOLDER, local_filename)
try:
_ensure_local_copy(S3_BUCKET, s3_key, local_path)
except Exception as exc:
print(f"S3 download error: {exc}", file=sys.stderr)
sys.exit(1)
# ---- Read & summarise --------------------------------------------------
try: try:
df = _read_head( _download_from_s3(S3_BUCKET, S3_KEY, local_path)
local_path, except Exception as exc:
row_count=args.rows, print(f"S3 download error: {exc}", file=sys.stderr)
delimiter=args.delimiter, sys.exit(1)
encoding=args.encoding,
quotechar=args.quotechar, # --- Read & summarize ---------------------------------------------------
) try:
df = _read_sas_head(local_path, row_count=10)
except Exception as exc: except Exception as exc:
print(f"File read error: {exc}", file=sys.stderr) print(f"File read error: {exc}", file=sys.stderr)
sys.exit(2) sys.exit(2)

View File

@ -1,757 +0,0 @@
"""S3-source counterpart to ``generic_loader/load_folder.py``.
Reads a YAML config that points at an S3 bucket + prefix, lists every object
under that prefix recursively, groups objects into *clusters* using the same
explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each
cluster's files into its own subfolder under a local destination root.
Supported file types:
* SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport``
* Delimited text files: ``.txt``, ``.csv``, ``.tsv``
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
1. YAML config
--------------
::
bucket: my-bucket # required
prefix: census/2020/raw/ # required; recursive scan under it
local_folder: ./downloads # required; one subfolder per cluster
aws_profile: default # optional; default boto3 chain if omitted
auto_detect: true # optional; default true
extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv
- .sas7bdat
- .csv
on_exists: skip # optional; skip | overwrite | error
concurrency: 4 # optional; default 4
clusters:
- pattern: '^group_a\\d+\\.sas7bdat$'
name: group_a
- pattern: '^group_b\\d+\\.sas7bdat$'
name: group_b
2. Command-line interface
-------------------------
::
python s3_download.py --config download_config.yaml [--dry-run]
[--overwrite] [--fail-fast]
Flags:
--config PATH Required. Path to the YAML config above.
--dry-run List discovered clusters and the objects each would
download (with sizes). No GET requests are issued
beyond the initial LIST.
--overwrite Force-redownload every matched object regardless of
local cache state. Equivalent to ``on_exists: overwrite``
for this run.
--fail-fast Abort the whole run on the first per-file download
failure. Default is to log the failure and keep going.
Exit codes:
0 - every file downloaded (or skipped) successfully (or dry-run completed)
1 - at least one file failed (details on stderr)
2 - the LIST returned no objects matching the configured extensions
3. Discovery rules
------------------
* Listing is recursive (no S3 ``Delimiter``). Regexes are matched against
the *basename* of each key (the part after the last ``/``), so a nested
object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by
``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled
identically the basename is extracted and matched the same way.
* Explicit patterns are tried in order. A key matched by one pattern is
removed from the pool before the next pattern runs. Overlap between
patterns is flagged as an error at discovery time.
* Auto-detect groups remaining keys by ``re.sub(r'\\d+$', '', stem)`` with
any trailing ``_`` / ``-`` stripped afterward, mirroring
``load_folder.py``. Stems without trailing digits become singleton
clusters named after the stem.
* Within a cluster, files are sorted numerically by the LAST digit group
in the stem so ``_9_`` sorts before ``_40_`` regardless of zero-padding.
4. Library usage
----------------
::
from s3_download import load_download_config, list_s3_objects, \
discover_clusters, download_cluster, build_s3_client
cfg = load_download_config("download_config.yaml")
s3 = build_s3_client(cfg)
objects = list_s3_objects(s3, cfg)
for cluster in discover_clusters(cfg, objects):
download_cluster(s3, cfg, cluster)
"""
from __future__ import annotations
import argparse
import os
import re
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3
import yaml
SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv")
DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS
VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error")
DEFAULT_CONCURRENCY: int = 4
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class _ExplicitPattern:
"""Parsed form of a single ``clusters[*]`` YAML entry."""
pattern: re.Pattern
raw_pattern: str
name: str
@dataclass
class DownloadConfig:
"""Top-level configuration parsed from YAML."""
bucket: str
prefix: str
local_folder: Path
aws_profile: Optional[str] = None
auto_detect: bool = True
extensions: Tuple[str, ...] = DEFAULT_EXTENSIONS
on_exists: str = "skip"
concurrency: int = DEFAULT_CONCURRENCY
explicit: List[_ExplicitPattern] = field(default_factory=list)
@dataclass
class S3Object:
"""A single S3 object selected from the LIST response."""
key: str
basename: str
size: int
@dataclass
class ClusterSpec:
"""Resolved per-cluster download settings."""
name: str
objects: List[S3Object]
source: str # "explicit" or "auto"
pattern: Optional[str] = None
# ---------------------------------------------------------------------------
# Config loading
# ---------------------------------------------------------------------------
def _validate_on_exists(value: Any, where: str) -> str:
s = str(value).lower()
if s not in VALID_ON_EXISTS:
raise ValueError(
f"{where}: on_exists={value!r} is not one of {list(VALID_ON_EXISTS)}"
)
return s
def _parse_extensions(raw_value: Any, where: str) -> Tuple[str, ...]:
if raw_value is None:
return DEFAULT_EXTENSIONS
if isinstance(raw_value, str):
items = [raw_value]
elif isinstance(raw_value, list):
items = list(raw_value)
else:
raise ValueError(
f"{where}: 'extensions' must be a string or list of strings."
)
out: List[str] = []
for i, item in enumerate(items):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'extensions[{i}]' must be a non-empty string."
)
ext = item.strip().lower()
if not ext.startswith("."):
ext = "." + ext
out.append(ext)
if len(out) != len(set(out)):
raise ValueError(f"{where}: 'extensions' contains duplicates.")
return tuple(out)
def _parse_concurrency(raw_value: Any, where: str) -> int:
if raw_value is None:
return DEFAULT_CONCURRENCY
try:
value = int(raw_value)
except (TypeError, ValueError):
raise ValueError(
f"{where}: 'concurrency' must be a positive integer, "
f"got {raw_value!r}"
)
if value <= 0:
raise ValueError(
f"{where}: 'concurrency' must be a positive integer, got {value}"
)
return value
def load_download_config(path: Path) -> DownloadConfig:
"""Parse and validate the YAML download config at ``path``."""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(
f"Config at {path} must be a YAML mapping at the top level."
)
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
# missing when neither the YAML key nor the env var is present.
required_always = ("prefix", "local_folder")
missing = [k for k in required_always if k not in raw]
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
missing.insert(0, "bucket")
if missing:
raise ValueError(
f"Config {path} missing required keys: {', '.join(missing)}"
)
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
if not bucket:
bucket = os.environ.get("S3_BUCKET", "")
if not bucket:
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
prefix = str(raw["prefix"])
# Normalize: strip leading slash, ensure exactly one trailing slash unless
# the user explicitly asked for an empty prefix (whole bucket scan).
prefix = prefix.lstrip("/")
if prefix and not prefix.endswith("/"):
prefix = prefix + "/"
local_folder = Path(raw["local_folder"])
if not local_folder.is_absolute():
candidate = (path.parent / local_folder).resolve()
# Mirror load_folder's behavior: prefer the config-relative path when
# it exists, otherwise keep what the user wrote. Either way, we'll
# mkdir(parents=True) before downloading so non-existence is fine.
local_folder = candidate if candidate.parent.exists() else candidate
aws_profile = raw.get("aws_profile")
if aws_profile is not None:
aws_profile = str(aws_profile).strip() or None
if aws_profile is None:
aws_profile = os.environ.get("AWS_PROFILE") or None
auto_detect = bool(raw.get("auto_detect", True))
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")
on_exists = _validate_on_exists(
raw.get("on_exists", "skip"), f"Config {path}"
)
concurrency = _parse_concurrency(
raw.get("concurrency"), f"Config {path}"
)
explicit: List[_ExplicitPattern] = []
seen_names: set = set()
clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list):
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
for i, entry in enumerate(clusters_raw):
where = f"Config {path} clusters[{i}]"
if not isinstance(entry, dict):
raise ValueError(f"{where} must be a mapping.")
if "pattern" not in entry or "name" not in entry:
raise ValueError(f"{where} must include 'pattern' and 'name'.")
raw_pat = str(entry["pattern"])
try:
compiled = re.compile(raw_pat)
except re.error as e:
raise ValueError(
f"{where}: invalid regex {raw_pat!r}: {e}"
) from e
name = str(entry["name"]).strip()
if not name:
raise ValueError(f"{where}: 'name' must be a non-empty string.")
if name in seen_names:
raise ValueError(
f"{where}: duplicate cluster name {name!r}"
)
seen_names.add(name)
explicit.append(
_ExplicitPattern(
pattern=compiled, raw_pattern=raw_pat, name=name,
)
)
return DownloadConfig(
bucket=bucket,
prefix=prefix,
local_folder=local_folder,
aws_profile=aws_profile,
auto_detect=auto_detect,
extensions=extensions,
on_exists=on_exists,
concurrency=concurrency,
explicit=explicit,
)
# ---------------------------------------------------------------------------
# S3 listing
# ---------------------------------------------------------------------------
def build_s3_client(cfg: DownloadConfig):
"""Build a boto3 S3 client honoring ``cfg.aws_profile`` if set."""
if cfg.aws_profile:
session = boto3.Session(profile_name=cfg.aws_profile)
else:
session = boto3.Session()
return session.client("s3")
def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]:
"""List all objects under ``cfg.prefix`` recursively, filtered by extension.
Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text
extensions (``.txt``, ``.csv``, ``.tsv``) whichever are present in
``cfg.extensions``.
"""
paginator = s3_client.get_paginator("list_objects_v2")
out: List[S3Object] = []
for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix):
for entry in page.get("Contents", []):
key = entry["Key"]
if key.endswith("/"):
continue
basename = key.rsplit("/", 1)[-1]
ext = ("." + basename.rsplit(".", 1)[-1].lower()
if "." in basename else "")
if ext not in cfg.extensions:
continue
out.append(
S3Object(key=key, basename=basename, size=int(entry["Size"]))
)
out.sort(key=lambda o: o.key)
return out
# ---------------------------------------------------------------------------
# Cluster discovery (mirrors generic_loader/load_folder.py)
# ---------------------------------------------------------------------------
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
_DIGIT_GROUP_RE = re.compile(r"\d+")
def _auto_prefix(stem: str) -> str:
"""Cluster key for *stem*: strip trailing digits and any trailing _/-."""
stripped = _TRAILING_DIGIT_RE.sub("", stem)
stripped = stripped.rstrip("_-")
return stripped or stem
def _basename_stem(basename: str) -> str:
if "." in basename:
return basename.rsplit(".", 1)[0]
return basename
def _cluster_sort_key(obj: S3Object) -> Tuple[int, str]:
"""Sort key for ordering objects within a cluster by trailing digits."""
stem = _basename_stem(obj.basename)
digits = _DIGIT_GROUP_RE.findall(stem)
n = int(digits[-1]) if digits else -1
return (n, stem)
def discover_clusters(
cfg: DownloadConfig, objects: List[S3Object]
) -> List[ClusterSpec]:
"""Bucket *objects* into clusters using explicit patterns then auto-detect."""
clusters: List[ClusterSpec] = []
for i, p_i in enumerate(cfg.explicit):
for j in range(i + 1, len(cfg.explicit)):
p_j = cfg.explicit[j]
for obj in objects:
if (p_i.pattern.search(obj.basename)
and p_j.pattern.search(obj.basename)):
raise ValueError(
f"Object {obj.basename!r} matches multiple explicit "
f"patterns: {p_i.raw_pattern!r} and "
f"{p_j.raw_pattern!r}"
)
remaining = list(objects)
for patt in cfg.explicit:
matched = [o for o in remaining if patt.pattern.search(o.basename)]
if not matched:
clusters.append(
ClusterSpec(
name=patt.name,
objects=[],
source="explicit",
pattern=patt.raw_pattern,
)
)
continue
remaining = [o for o in remaining if o not in matched]
clusters.append(
ClusterSpec(
name=patt.name,
objects=sorted(matched, key=_cluster_sort_key),
source="explicit",
pattern=patt.raw_pattern,
)
)
if cfg.auto_detect and remaining:
buckets: Dict[str, List[S3Object]] = {}
for obj in remaining:
key = _auto_prefix(_basename_stem(obj.basename))
buckets.setdefault(key, []).append(obj)
for key in sorted(buckets):
clusters.append(
ClusterSpec(
name=key,
objects=sorted(buckets[key], key=_cluster_sort_key),
source="auto",
)
)
return clusters
# ---------------------------------------------------------------------------
# Download
# ---------------------------------------------------------------------------
def _local_path(
cfg: DownloadConfig,
cluster: ClusterSpec,
obj: S3Object,
basename_collisions: set,
) -> Path:
"""Resolve the on-disk destination path for *obj*.
Falls back to a key-derived filename when two objects in the same cluster
share a basename (possible under recursive scan).
"""
cluster_dir = cfg.local_folder / cluster.name
if obj.basename in basename_collisions:
safe = obj.key.replace("/", "__")
return cluster_dir / safe
return cluster_dir / obj.basename
def _basename_collisions(cluster: ClusterSpec) -> set:
"""Return the set of basenames that appear more than once in *cluster*."""
seen: Dict[str, int] = {}
for obj in cluster.objects:
seen[obj.basename] = seen.get(obj.basename, 0) + 1
return {name for name, count in seen.items() if count > 1}
def _decide_action(
local_path: Path, obj: S3Object, on_exists: str
) -> Tuple[str, Optional[str]]:
"""Return ``(action, message)`` where action is 'download' or 'skip'."""
if on_exists == "overwrite":
return ("download", None)
if not local_path.exists():
return ("download", None)
local_size = local_path.stat().st_size
if local_size == obj.size:
return (
"skip",
f" skip {obj.key} -> {local_path} (size {local_size} matches)",
)
if on_exists == "error":
raise RuntimeError(
f"Local file {local_path} exists with size {local_size} but S3 "
f"object s3://{obj.key} has size {obj.size} (on_exists=error)"
)
return (
"download",
f" re-download {obj.key} -> {local_path} "
f"(local size {local_size} != S3 {obj.size})",
)
def download_cluster(
s3_client,
cfg: DownloadConfig,
cluster: ClusterSpec,
*,
on_exists_override: Optional[str] = None,
fail_fast: bool = False,
) -> Tuple[int, int, int, List[Tuple[str, Exception]]]:
"""Download every object in *cluster* into ``cfg.local_folder/cluster.name``.
Returns ``(downloaded, skipped, bytes_downloaded, failures)`` where
*failures* is a list of ``(key, exception)`` tuples.
"""
if not cluster.objects:
return (0, 0, 0, [])
on_exists = on_exists_override or cfg.on_exists
cluster_dir = cfg.local_folder / cluster.name
cluster_dir.mkdir(parents=True, exist_ok=True)
collisions = _basename_collisions(cluster)
plans: List[Tuple[S3Object, Path, str]] = []
skipped = 0
for obj in cluster.objects:
local_path = _local_path(cfg, cluster, obj, collisions)
action, message = _decide_action(local_path, obj, on_exists)
if message:
print(message, file=sys.stderr)
if action == "skip":
skipped += 1
continue
plans.append((obj, local_path, action))
downloaded = 0
bytes_downloaded = 0
failures: List[Tuple[str, Exception]] = []
if not plans:
return (0, skipped, 0, failures)
def _do_one(item):
obj, local_path, _action = item
local_path.parent.mkdir(parents=True, exist_ok=True)
s3_client.download_file(cfg.bucket, obj.key, str(local_path))
return obj
if cfg.concurrency <= 1 or len(plans) == 1:
for plan in plans:
obj = plan[0]
try:
_do_one(plan)
downloaded += 1
bytes_downloaded += obj.size
print(
f" ok {obj.key} -> {plan[1]} ({obj.size:,} bytes)",
file=sys.stderr,
)
except Exception as exc:
failures.append((obj.key, exc))
print(
f" FAIL {obj.key}: {exc}", file=sys.stderr,
)
if fail_fast:
break
else:
with ThreadPoolExecutor(max_workers=cfg.concurrency) as pool:
future_to_plan = {pool.submit(_do_one, p): p for p in plans}
for fut in as_completed(future_to_plan):
plan = future_to_plan[fut]
obj = plan[0]
try:
fut.result()
downloaded += 1
bytes_downloaded += obj.size
print(
f" ok {obj.key} -> {plan[1]} "
f"({obj.size:,} bytes)",
file=sys.stderr,
)
except Exception as exc:
failures.append((obj.key, exc))
print(
f" FAIL {obj.key}: {exc}", file=sys.stderr,
)
if fail_fast:
for other in future_to_plan:
other.cancel()
break
return (downloaded, skipped, bytes_downloaded, failures)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Download S3 objects (SAS data files and/or delimited text files) "
"under a prefix into a local folder, grouping objects into "
"clusters that each become one subfolder. "
"Supported extensions: "
+ ", ".join(DEFAULT_EXTENSIONS)
+ "."
),
)
p.add_argument(
"--config", required=True, type=Path, help="Path to YAML config",
)
p.add_argument(
"--dry-run",
action="store_true",
help=(
"List discovered clusters and the objects each would download. "
"No GET requests are issued beyond the initial LIST."
),
)
p.add_argument(
"--overwrite",
action="store_true",
help=(
"Force-redownload every matched object regardless of local "
"cache state. Equivalent to on_exists=overwrite."
),
)
p.add_argument(
"--fail-fast",
action="store_true",
help=(
"Abort on the first per-file download failure. Default is to "
"log the failure and keep going."
),
)
return p
def _format_bytes(n: int) -> str:
units = ("B", "KiB", "MiB", "GiB", "TiB")
size = float(n)
for unit in units:
if size < 1024 or unit == units[-1]:
return f"{size:,.1f} {unit}" if unit != "B" else f"{int(size):,} B"
size /= 1024
return f"{n} B"
def _describe_cluster(cluster: ClusterSpec) -> str:
src = cluster.source
if cluster.pattern:
src += f" pattern={cluster.pattern!r}"
if not cluster.objects:
return (
f"cluster {cluster.name!r} [{src}]\n objects: (no matching keys)"
)
total = sum(o.size for o in cluster.objects)
lines = [
f"cluster {cluster.name!r} [{src}] "
f"{len(cluster.objects)} object(s), {_format_bytes(total)}"
]
for obj in cluster.objects:
lines.append(f" - {obj.key} ({_format_bytes(obj.size)})")
return "\n".join(lines)
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
cfg = load_download_config(args.config)
s3 = build_s3_client(cfg)
objects = list_s3_objects(s3, cfg)
if not objects:
print(
f"error: no objects matching extensions "
f"{list(cfg.extensions)} found under "
f"s3://{cfg.bucket}/{cfg.prefix}",
file=sys.stderr,
)
return 2
clusters = discover_clusters(cfg, objects)
loadable = [c for c in clusters if c.objects]
print(
f"discovered {len(loadable)} cluster(s) "
f"({sum(len(c.objects) for c in loadable)} object(s)) under "
f"s3://{cfg.bucket}/{cfg.prefix}:"
)
for c in clusters:
print(_describe_cluster(c))
if args.dry_run:
return 0
cfg.local_folder.mkdir(parents=True, exist_ok=True)
on_exists_override = "overwrite" if args.overwrite else None
totals: List[Tuple[str, int, int, int]] = [] # (name, dl, skip, bytes)
failures: List[Tuple[str, str, Exception]] = [] # (cluster, key, exc)
aborted = False
for cluster in loadable:
if aborted:
break
print(
f"\n>>> downloading cluster {cluster.name!r} "
f"({len(cluster.objects)} object(s))"
)
try:
dl, sk, by, fails = download_cluster(
s3, cfg, cluster,
on_exists_override=on_exists_override,
fail_fast=args.fail_fast,
)
except Exception as exc:
failures.append((cluster.name, "<cluster>", exc))
print(
f" !! cluster {cluster.name!r} aborted: {exc}",
file=sys.stderr,
)
if args.fail_fast:
aborted = True
continue
totals.append((cluster.name, dl, sk, by))
for key, exc in fails:
failures.append((cluster.name, key, exc))
if fails and args.fail_fast:
aborted = True
print("\n=== summary ===")
for name, dl, sk, by in totals:
print(
f" {name}: downloaded {dl}, skipped {sk}, "
f"{_format_bytes(by)}"
)
for cname, key, exc in failures:
print(f" FAIL {cname} {key}: {exc}", file=sys.stderr)
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,8 +0,0 @@
# S3 Directory Explorer - Input File
# One S3 prefix per line (within the bucket configured in data_explorer.py).
# Blank lines and comments (#) are ignored.
#
# Examples:
# data/sales/
# data/inventory/
# data/archive/

View File

@ -1,111 +0,0 @@
# Example S3 download config for utils/s3_download.py.
#
# Shape mirrors what `s3_download.py` expects:
#
# python s3_download.py --config sample_s3_download_config.yaml --dry-run
# python s3_download.py --config sample_s3_download_config.yaml
#
# Relative paths (e.g. local_folder) are resolved against this config file's
# directory first, falling back to the current working directory if that
# doesn't exist.
# ---------------------------------------------------------------------------
# Required: where to read from and write to
# ---------------------------------------------------------------------------
bucket: my-bucket
# Listing is recursive under this prefix - no S3 Delimiter is used. A nested
# object like `census/2020/raw/nested/group_c1.sas7bdat` will be considered.
# Regex patterns below match against the object BASENAME only (the part
# after the last `/`), so subfolder location does not affect matching.
prefix: census/2020/raw/
# Root destination on disk. One subfolder per cluster is created beneath it.
# If two objects in the same cluster share a basename (possible under the
# recursive scan), the second one is renamed to a key-derived filename
# (slashes replaced with `__`) so neither file overwrites the other.
local_folder: ./downloads
# ---------------------------------------------------------------------------
# Optional: AWS credentials
# ---------------------------------------------------------------------------
# Named profile from ~/.aws/credentials. Omit to use the default boto3
# credential chain (env vars, instance role, SSO, etc.).
# aws_profile: default
# ---------------------------------------------------------------------------
# Optional: discovery behavior
# ---------------------------------------------------------------------------
# When true (default), any object not matched by an explicit pattern below is
# auto-grouped with its peers by stripping trailing digits (and any trailing
# _ / -) from the basename stem. Stems without trailing digits become their
# own singleton cluster.
#
# Auto-detect only recognizes *trailing* digit runs. If your basenames put
# the varying number in the middle of the stem (e.g. surrounded by year,
# region, and detail components), auto-detect will NOT group them - each
# object becomes its own singleton cluster. Use an explicit pattern instead;
# see the embedded-digit example near the bottom of this file.
auto_detect: true
# Object extensions to consider. Anything else under the prefix is ignored.
# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv
# extensions:
# - .sas7bdat
# - .xpt
# - .txt
# - .csv
# - .tsv
# ---------------------------------------------------------------------------
# Optional: download behavior
# ---------------------------------------------------------------------------
# What to do when the destination file already exists locally.
# skip - (default) if the local file's byte size matches the S3
# object's Size, reuse it. If sizes differ, re-download with a
# warning. Same byte-size cache rule used by
# utils/file_viewer.py::_ensure_local_copy.
# overwrite - always re-download.
# error - abort the run if any local file exists with a different size.
# (Equal sizes still skip.)
on_exists: skip
# Parallel download workers. boto3 clients are thread-safe. Default: 4.
# concurrency: 4
# ---------------------------------------------------------------------------
# Explicit cluster patterns
# ---------------------------------------------------------------------------
#
# Each pattern is matched against the S3 object BASENAME (last path segment
# of the key). Objects matched by a pattern are pulled out of the
# auto-detect pool, so explicit and auto clusters compose cleanly.
#
# `name` becomes both the subfolder name under `local_folder` and the label
# used in the discovery / summary output. Names must be unique across
# explicit clusters.
clusters:
- pattern: '^group_a\d+\.sas7bdat$'
name: group_a
# - pattern: '^group_b\d+\.sas7bdat$'
# name: group_b
# Embedded-digit example. When the varying number sits in the MIDDLE of
# the basename stem (e.g. year2020_regionA_40_detail.sas7bdat,
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
# them - each object becomes its own singleton cluster. An explicit
# pattern bucketizes them correctly. The \d+ matches any width, and
# objects within the cluster are sorted numerically by the last digit
# group in the stem, so _9_ sorts before _40_ regardless of zero-padding.
#
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# name: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# name: data_group_a

File diff suppressed because it is too large Load Diff