Compare commits
No commits in common. "main" and "batch_folder_processing" have entirely different histories.
main
...
batch_fold
7
.gitignore
vendored
7
.gitignore
vendored
@ -1,7 +0,0 @@
|
||||
/.venv
|
||||
/samples
|
||||
.env
|
||||
!.env.example
|
||||
__pycache__/
|
||||
venv/
|
||||
*/__pycache__/
|
||||
@ -3,6 +3,3 @@ PGPORT=5432
|
||||
PGUSER=
|
||||
PGPASSWORD=
|
||||
PGDATABASE=
|
||||
|
||||
S3_BUCKET=my-bucket
|
||||
AWS_PROFILE=default
|
||||
|
||||
5
generic_loader/.gitignore
vendored
Normal file
5
generic_loader/.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
/.venv
|
||||
/samples
|
||||
/.env
|
||||
/__pycache__
|
||||
/venv
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,10 +1,7 @@
|
||||
pandas>=2.0,<3.0
|
||||
pyreadstat>=1.2,<2.0
|
||||
pyreadstat>=1.2,<1.3
|
||||
numpy>=2.1,<3.0
|
||||
pyarrow>=22.0,<24.0
|
||||
pyyaml>=6.0,<7.0
|
||||
psycopg2-binary>=2.9,<3.0
|
||||
python-dotenv>=1.0,<2.0
|
||||
boto3>=1.28,<2.0
|
||||
openpyxl>=3.1,<4.0
|
||||
tqdm>=4.66,<5.0
|
||||
@ -15,64 +15,3 @@ tablename: kitchensink
|
||||
# What to do if the target table already exists: fail | replace | append
|
||||
# Defaults to fail.
|
||||
if_exists: append
|
||||
|
||||
# file_type: Type of data file to load. One of: sas | text. Default: sas.
|
||||
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
|
||||
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
|
||||
# file_type: sas
|
||||
|
||||
# delimiter: Column delimiter for text files. Only used when file_type: text.
|
||||
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
|
||||
# or any single character.
|
||||
# delimiter: ","
|
||||
|
||||
# text_encoding: Character encoding for text files. Default: utf-8.
|
||||
# Common alternatives: latin-1, cp1252, iso-8859-1.
|
||||
# text_encoding: utf-8
|
||||
|
||||
# quotechar: Quote character for text files. Default: '"' (double quote).
|
||||
# quotechar: '"'
|
||||
|
||||
# partition_by: Partition the table by unique values of these columns.
|
||||
# Columns are applied in cascading order (first column = top-level partition).
|
||||
# Requires if_exists: replace or fail (not append for initial creation).
|
||||
# Single field:
|
||||
# partition_by: state
|
||||
# Multiple fields (cascading):
|
||||
# partition_by:
|
||||
# - state
|
||||
# - zip
|
||||
#
|
||||
# max_partitions: Warning threshold for total partition count (default: 10000).
|
||||
# If the number of partitions exceeds this, a warning is logged but loading continues.
|
||||
# max_partitions: 10000
|
||||
|
||||
# indexes: Create B-tree indexes on these columns after data loading.
|
||||
# Indexes are created with IF NOT EXISTS for safe use with append mode.
|
||||
# Single column:
|
||||
# indexes: state
|
||||
# Multiple columns (one index per column):
|
||||
# indexes:
|
||||
# - state
|
||||
# - zip
|
||||
|
||||
# column_types: Explicit {column_name: postgres_type} overrides that
|
||||
# bypass automatic type inference for the listed columns. Useful when
|
||||
# pyreadstat reports a column as NUM but you want it stored as TEXT
|
||||
# (phone/ID columns that are conceptually strings), or when a column's
|
||||
# inferred type is off for any other reason. Columns not listed here
|
||||
# fall through to the normal inference path. Nullability is always
|
||||
# computed from the data.
|
||||
#
|
||||
# column_types:
|
||||
# RESP_PH_PREFIX_ID: TEXT
|
||||
# SOMELONG_ID: BIGINT
|
||||
|
||||
# all_nullable: If true, every column is stamped nullable in the generated
|
||||
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
|
||||
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
|
||||
# rare-null data downstream) and COPY blows up mid-load on the first null
|
||||
# it hits. Off by default. The CLI flag --all-nullable overrides this to
|
||||
# true when set.
|
||||
#
|
||||
# all_nullable: false
|
||||
|
||||
@ -19,33 +19,8 @@ if_exists: replace
|
||||
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
||||
# _ / -) from the file stem. Files with no trailing digits become their own
|
||||
# singleton cluster.
|
||||
#
|
||||
# Auto-detect only recognizes *trailing* digit runs. If your file names put
|
||||
# the varying number in the middle of the stem (e.g. surrounded by year,
|
||||
# region, and detail components), auto-detect will NOT group them - each
|
||||
# file becomes its own singleton cluster. Use an explicit pattern instead;
|
||||
# see the embedded-digit example near the bottom of this file.
|
||||
auto_detect: true
|
||||
|
||||
# file_type: Type of data files in this folder. One of: sas | text. Default: sas.
|
||||
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
|
||||
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
|
||||
# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files
|
||||
# instead of .sas7bdat/.xpt/.xport files.
|
||||
# file_type: sas
|
||||
|
||||
# delimiter: Column delimiter for text files. Only used when file_type: text.
|
||||
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
|
||||
# or any single character.
|
||||
# delimiter: ","
|
||||
|
||||
# text_encoding: Character encoding for text files. Default: utf-8.
|
||||
# Common alternatives: latin-1, cp1252, iso-8859-1.
|
||||
# text_encoding: utf-8
|
||||
|
||||
# quotechar: Quote character for text files. Default: '"' (double quote).
|
||||
# quotechar: '"'
|
||||
|
||||
# Folder-level column filter. Every file in every cluster passes through
|
||||
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
|
||||
# override these via its own `include` / `exclude` keys.
|
||||
@ -56,76 +31,15 @@ auto_detect: true
|
||||
# exclude:
|
||||
# - ALLNULL
|
||||
|
||||
# Folder-level partition_by: Partition every cluster's table by unique values
|
||||
# of these columns. Inherited by all clusters unless overridden per-cluster.
|
||||
# Requires if_exists: replace or fail (not append for initial creation).
|
||||
# Single field:
|
||||
# partition_by: state
|
||||
# Multiple fields (cascading):
|
||||
# partition_by:
|
||||
# - state
|
||||
# - zip
|
||||
#
|
||||
# Folder-level max_partitions: Warning threshold for total partition count
|
||||
# (default: 10000). Inherited by all clusters unless overridden per-cluster.
|
||||
# max_partitions: 10000
|
||||
|
||||
# Folder-level indexes: Create B-tree indexes on these columns after data
|
||||
# loading. Inherited by all clusters unless overridden per-cluster.
|
||||
# Indexes are created with IF NOT EXISTS for safe use with append mode.
|
||||
# Single column:
|
||||
# indexes: state
|
||||
# Multiple columns (one index per column):
|
||||
# indexes:
|
||||
# - state
|
||||
# - zip
|
||||
|
||||
# Folder-level column_types: Explicit {column_name: postgres_type} map that
|
||||
# bypasses automatic type inference for the listed columns. Applied to
|
||||
# every cluster unless a cluster supplies its own column_types, which are
|
||||
# merged on top (cluster entries win on conflict).
|
||||
#
|
||||
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
|
||||
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
|
||||
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
|
||||
# BIGINT). Entries in column_types here win over that auto-union - use
|
||||
# them when the auto result is wrong or when --no-prescan disables the
|
||||
# auto-union and you still need to pin a column.
|
||||
#
|
||||
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
|
||||
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
|
||||
# don't exist in a given file are simply ignored for that file.
|
||||
#
|
||||
# column_types:
|
||||
# RESP_PH_PREFIX_ID: TEXT
|
||||
# RESP_PH_SUFFIX_ID: TEXT
|
||||
# SOMELONG_ID: BIGINT
|
||||
|
||||
# Folder-level all_nullable: If true, every column of every cluster is
|
||||
# stamped nullable in the generated schema; NOT NULL inference is skipped
|
||||
# entirely. Use this when the sampler wrongly concludes a column has no
|
||||
# nulls (sampled rows happened to be dense, but later files in the cluster
|
||||
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
|
||||
# unless a cluster supplies its own all_nullable. The CLI flag
|
||||
# --all-nullable overrides both this and any per-cluster setting when
|
||||
# passed. Off by default.
|
||||
#
|
||||
# all_nullable: false
|
||||
|
||||
# Explicit cluster patterns. Each pattern is matched against the file
|
||||
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
||||
# pool, so explicit and auto clusters compose cleanly.
|
||||
#
|
||||
# `tablename` is required. `if_exists`, `include`, `exclude`, and
|
||||
# `column_types` are optional per-cluster overrides of the folder-level
|
||||
# defaults above. Cluster-level column_types entries win over folder-
|
||||
# level entries for the same column.
|
||||
# `tablename` is required. `if_exists`, `include`, and `exclude` are
|
||||
# optional per-cluster overrides of the folder-level defaults above.
|
||||
clusters:
|
||||
- pattern: '^group_a\d+\.xpt$'
|
||||
tablename: group_a
|
||||
# column_types:
|
||||
# INTCOL: TEXT
|
||||
# all_nullable: true # per-cluster override of the folder-level default
|
||||
|
||||
# Example of an explicit override. Uncomment to force the group_b cluster to
|
||||
# append instead of replace even though the folder default is "replace":
|
||||
@ -134,44 +48,7 @@ clusters:
|
||||
# tablename: group_b
|
||||
# if_exists: append
|
||||
|
||||
# Per-cluster partition_by / max_partitions override. These take precedence
|
||||
# over the folder-level defaults above.
|
||||
#
|
||||
# - pattern: '^group_c\d+\.xpt$'
|
||||
# tablename: group_c
|
||||
# partition_by:
|
||||
# - region
|
||||
# - year
|
||||
# max_partitions: 500
|
||||
|
||||
# Per-cluster indexes override. Takes precedence over the folder-level
|
||||
# indexes default above. An explicit empty list disables indexing for
|
||||
# this cluster even when the folder default has indexes.
|
||||
#
|
||||
# - pattern: '^group_d\d+\.xpt$'
|
||||
# tablename: group_d
|
||||
# indexes:
|
||||
# - region
|
||||
# - year
|
||||
|
||||
# Embedded-digit example. When the varying number sits in the MIDDLE of
|
||||
# the stem (e.g. year2020_regionA_40_detail.sas7bdat,
|
||||
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
|
||||
# them - each file becomes its own singleton cluster. An explicit
|
||||
# pattern bucketizes them correctly. The \d+ matches any width, and
|
||||
# files within the cluster are sorted numerically by the last digit
|
||||
# group in the stem, so _9_ sorts before _40_ regardless of zero-
|
||||
# padding. Gaps in the numeric sequence (missing 3, 7, 14, ...) are
|
||||
# fine - whatever files are present get loaded in numeric order.
|
||||
#
|
||||
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
|
||||
# tablename: year2020_regionA_detail
|
||||
|
||||
# Text file cluster example (when file_type: text):
|
||||
# - pattern: '^data_group_a\d+\.txt$'
|
||||
# tablename: data_group_a
|
||||
|
||||
# With only the group_a pattern explicit, auto_detect: true will still
|
||||
# bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
|
||||
# With only the gq pattern explicit, auto_detect: true will still bucket
|
||||
# group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
|
||||
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
|
||||
# for the fixture that exercises exactly this layout.
|
||||
|
||||
@ -1,107 +0,0 @@
|
||||
{
|
||||
"ALLNULL": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
|
||||
"nullable": true
|
||||
},
|
||||
"ALLNULLC": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "entirely null character",
|
||||
"nullable": true
|
||||
},
|
||||
"BIGINT": {
|
||||
"note": "values beyond int32 range",
|
||||
"nullable": true,
|
||||
"postgres_type": "BIGINT"
|
||||
},
|
||||
"BOOLCOL": {
|
||||
"acceptable_types": [
|
||||
"BOOLEAN",
|
||||
"SMALLINT",
|
||||
"INTEGER"
|
||||
],
|
||||
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
|
||||
"nullable": true
|
||||
},
|
||||
"CONST": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"nullable": false
|
||||
},
|
||||
"DATEASTR": {
|
||||
"note": "stored as char in SAS; loader should coerce ISO-date strings",
|
||||
"nullable": true,
|
||||
"postgres_type": "DATE"
|
||||
},
|
||||
"DATECOL": {
|
||||
"note": "positive control",
|
||||
"nullable": false,
|
||||
"postgres_type": "DATE"
|
||||
},
|
||||
"DTCOL": {
|
||||
"acceptable_types": [
|
||||
"TIMESTAMP",
|
||||
"TIMESTAMP WITHOUT TIME ZONE"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"FLOATCOL": {
|
||||
"acceptable_types": [
|
||||
"DOUBLE PRECISION",
|
||||
"NUMERIC"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"ID": {
|
||||
"nullable": false,
|
||||
"postgres_type": "INTEGER"
|
||||
},
|
||||
"INTCOL": {
|
||||
"note": "positive control",
|
||||
"nullable": false,
|
||||
"postgres_type": "INTEGER"
|
||||
},
|
||||
"LONGSTR": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"nullable": true
|
||||
},
|
||||
"MIXED": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "heterogeneous content; loader should fall back to text",
|
||||
"nullable": true
|
||||
},
|
||||
"NUMASSTR": {
|
||||
"acceptable_types": [
|
||||
"NUMERIC",
|
||||
"DOUBLE PRECISION"
|
||||
],
|
||||
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
|
||||
"nullable": true
|
||||
},
|
||||
"STRCOL": {
|
||||
"acceptable_types": [
|
||||
"TEXT",
|
||||
"VARCHAR"
|
||||
],
|
||||
"note": "positive control",
|
||||
"nullable": false
|
||||
},
|
||||
"TIMECOL": {
|
||||
"nullable": true,
|
||||
"postgres_type": "TIME"
|
||||
}
|
||||
}
|
||||
Binary file not shown.
@ -1,679 +0,0 @@
|
||||
"""Explore S3 directories and categorise them by accessibility.
|
||||
|
||||
Reads a text file containing one S3 prefix per line (paths within the bucket
|
||||
configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then
|
||||
for each prefix:
|
||||
|
||||
- Lists all objects recursively (via ``list_objects_v2`` paginator)
|
||||
- **Only considers files matching the configured extensions** (default: all
|
||||
supported extensions — SAS and text). All other file types are ignored.
|
||||
- Tests read permission with ``head_object`` on the first matching file found
|
||||
- If the first file is accessible, tests ALL remaining files individually
|
||||
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
|
||||
tracks individual file **Exceptions** within available directories
|
||||
|
||||
Supported file types
|
||||
--------------------
|
||||
* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport``
|
||||
* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv``
|
||||
|
||||
A directory is considered *empty* if it contains no files matching the
|
||||
extension filter, even when other file types are present.
|
||||
|
||||
Configure the constants below (or use CLI arguments), then run::
|
||||
|
||||
python3 data_explorer.py [OPTIONS]
|
||||
|
||||
Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Set, Tuple
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dependency check
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
try:
|
||||
import boto3 # noqa: F401
|
||||
import botocore.exceptions # noqa: F401
|
||||
except ImportError:
|
||||
print(
|
||||
"ERROR: boto3 / botocore is not installed.\n"
|
||||
"Install with: pip install boto3",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Extension constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"}
|
||||
"""File extensions recognised as SAS data files."""
|
||||
|
||||
TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"}
|
||||
"""File extensions recognised as delimited text / CSV files."""
|
||||
|
||||
SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
||||
"""Union of all file extensions this tool can work with."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration defaults — edit these or override via CLI arguments
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
|
||||
"""Set of extensions to filter on (case-insensitive). Defaults to all supported."""
|
||||
|
||||
INPUT_FILE: str = "s3_directories.txt"
|
||||
"""Path to the text file containing one S3 prefix per line."""
|
||||
|
||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
||||
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
|
||||
|
||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
||||
"""AWS CLI profile name used for authentication."""
|
||||
|
||||
# Text-file reading defaults (used when downloading / previewing text files)
|
||||
DEFAULT_DELIMITER: str = ","
|
||||
DEFAULT_ENCODING: str = "utf-8"
|
||||
DEFAULT_QUOTECHAR: str = '"'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auto-detection helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def detect_file_type(filename: str) -> str:
|
||||
"""Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension.
|
||||
|
||||
The check is case-insensitive. For ``.tsv`` files the caller should
|
||||
default the delimiter to a tab character (``'\\t'``).
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> detect_file_type("data.sas7bdat")
|
||||
'sas'
|
||||
>>> detect_file_type("report.CSV")
|
||||
'text'
|
||||
>>> detect_file_type("archive.zip")
|
||||
'unknown'
|
||||
"""
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
if ext in SAS_EXTENSIONS:
|
||||
return "sas"
|
||||
if ext in TEXT_EXTENSIONS:
|
||||
return "text"
|
||||
return "unknown"
|
||||
|
||||
|
||||
def default_delimiter_for(filename: str) -> str:
|
||||
"""Return a sensible default delimiter for *filename*.
|
||||
|
||||
* ``.tsv`` → ``'\\t'``
|
||||
* everything else → ``','``
|
||||
"""
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
if ext == ".tsv":
|
||||
return "\t"
|
||||
return ","
|
||||
|
||||
|
||||
def matches_extensions(key: str, extensions: Set[str]) -> bool:
|
||||
"""Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive)."""
|
||||
key_lower = key.lower()
|
||||
return any(key_lower.endswith(ext) for ext in extensions)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data structures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class AvailableDir:
|
||||
"""An S3 directory that is readable."""
|
||||
|
||||
prefix: str
|
||||
file_count: int
|
||||
total_size: int # bytes
|
||||
accessible_count: int = 0 # files that passed head_object
|
||||
total_count: int = 0 # total .sas7bdat files found
|
||||
accessible_size: int = 0 # total size of accessible files only
|
||||
|
||||
|
||||
@dataclass
|
||||
class BlockedDir:
|
||||
"""An S3 directory where access was denied or an error occurred."""
|
||||
|
||||
prefix: str
|
||||
file_count: int
|
||||
error: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmptyDir:
|
||||
"""An S3 directory with no objects."""
|
||||
|
||||
prefix: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExceptionFile:
|
||||
"""A specific file that failed permission check within an otherwise available directory."""
|
||||
|
||||
prefix: str # the directory prefix
|
||||
key: str # the full S3 key of the failed file
|
||||
error: str # the error message
|
||||
|
||||
|
||||
@dataclass
|
||||
class Results:
|
||||
"""Aggregated exploration results."""
|
||||
|
||||
available: List[AvailableDir] = field(default_factory=list)
|
||||
blocked: List[BlockedDir] = field(default_factory=list)
|
||||
empty: List[EmptyDir] = field(default_factory=list)
|
||||
exceptions: List[ExceptionFile] = field(default_factory=list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def read_input_file(path: str) -> List[str]:
|
||||
"""Return a list of S3 prefixes from *path*, ignoring blanks and comments.
|
||||
|
||||
Each line is stripped and normalised so that non-empty prefixes always end
|
||||
with a trailing ``/``.
|
||||
"""
|
||||
prefixes: List[str] = []
|
||||
with open(path, encoding="utf-8") as fh:
|
||||
for raw_line in fh:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
# Normalise: strip surrounding whitespace/slashes, then re-add
|
||||
# a single trailing slash (unless the prefix is empty/root).
|
||||
line = line.strip("/")
|
||||
if line:
|
||||
line += "/"
|
||||
prefixes.append(line)
|
||||
return prefixes
|
||||
|
||||
|
||||
def format_size(size_bytes: int) -> str:
|
||||
"""Return a human-readable size string (KB, MB, GB, TB)."""
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes} B"
|
||||
for unit in ("KB", "MB", "GB", "TB"):
|
||||
size_bytes /= 1024.0
|
||||
if size_bytes < 1024.0 or unit == "TB":
|
||||
return f"{size_bytes:,.1f} {unit}"
|
||||
# Fallback (should not be reached)
|
||||
return f"{size_bytes:,.1f} TB"
|
||||
|
||||
|
||||
def extensions_label(extensions: Set[str]) -> str:
|
||||
"""Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``)."""
|
||||
return "/".join(sorted(extensions))
|
||||
|
||||
|
||||
def list_objects(
|
||||
s3_client: "botocore.client.S3",
|
||||
bucket: str,
|
||||
prefix: str,
|
||||
extensions: Set[str] | None = None,
|
||||
) -> Tuple[List[Tuple[str, int]], int]:
|
||||
"""Recursively list all objects under *prefix*.
|
||||
|
||||
Only objects whose key ends with one of *extensions* (case-insensitive) are
|
||||
counted. All other files are silently skipped. When *extensions* is
|
||||
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
|
||||
|
||||
Returns ``(files, total_size)`` where *files* is a list of
|
||||
``(key, size)`` tuples for every matching object and *total_size* is the
|
||||
sum of their sizes in bytes.
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = FILE_EXTENSIONS
|
||||
exts_lower = {e.lower() for e in extensions}
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
files: List[Tuple[str, int]] = []
|
||||
total_size: int = 0
|
||||
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
|
||||
continue
|
||||
files.append((obj["Key"], obj["Size"]))
|
||||
total_size += obj["Size"]
|
||||
return files, total_size
|
||||
|
||||
|
||||
def check_read_permission(
|
||||
s3_client: "botocore.client.S3",
|
||||
bucket: str,
|
||||
key: str,
|
||||
) -> str | None:
|
||||
"""Try ``head_object`` on *key*. Return ``None`` on success or an error string."""
|
||||
try:
|
||||
s3_client.head_object(Bucket=bucket, Key=key)
|
||||
except botocore.exceptions.ClientError as exc:
|
||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
||||
return f"{message} ({code})"
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def explore_directories(
|
||||
prefixes: List[str],
|
||||
*,
|
||||
extensions: Set[str] | None = None,
|
||||
) -> Results:
|
||||
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
prefixes:
|
||||
List of S3 key prefixes to explore.
|
||||
extensions:
|
||||
Set of file extensions to filter on. Defaults to the module-level
|
||||
``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``).
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = FILE_EXTENSIONS
|
||||
exts_lower = {e.lower() for e in extensions}
|
||||
ext_label = extensions_label(extensions)
|
||||
|
||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
||||
s3 = session.client("s3")
|
||||
|
||||
results = Results()
|
||||
total = len(prefixes)
|
||||
|
||||
for idx, prefix in enumerate(prefixes, start=1):
|
||||
print(
|
||||
f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# --- Recursive listing ------------------------------------------------
|
||||
try:
|
||||
files, total_size = list_objects(
|
||||
s3, S3_BUCKET, prefix, extensions=extensions,
|
||||
)
|
||||
except botocore.exceptions.ClientError as exc:
|
||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})")
|
||||
)
|
||||
continue
|
||||
except Exception as exc:
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=0, error=str(exc))
|
||||
)
|
||||
continue
|
||||
|
||||
if not files:
|
||||
results.empty.append(EmptyDir(prefix=prefix))
|
||||
continue
|
||||
|
||||
file_count = len(files)
|
||||
|
||||
# --- Permission check on first file -----------------------------------
|
||||
# in "/") for the head_object test. The listing is already filtered
|
||||
# to the requested extensions, so any non-marker key is a valid probe.
|
||||
first_key, _ = files[0]
|
||||
test_key = first_key
|
||||
if first_key.endswith("/") and total_size > 0:
|
||||
for key, size in files:
|
||||
if not (key.endswith("/") and size == 0):
|
||||
test_key = key
|
||||
break
|
||||
|
||||
error = check_read_permission(s3, S3_BUCKET, test_key)
|
||||
if error is not None:
|
||||
# First file blocked → entire directory is blocked
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=file_count, error=error)
|
||||
)
|
||||
continue
|
||||
|
||||
# --- First file accessible → check ALL remaining files ----------------
|
||||
accessible_count = 1 # the first (test_key) already passed
|
||||
accessible_size = 0
|
||||
dir_exceptions: List[ExceptionFile] = []
|
||||
|
||||
# Find the size of the test_key to count it
|
||||
for key, size in files:
|
||||
if key == test_key:
|
||||
accessible_size = size
|
||||
break
|
||||
|
||||
# Build list of remaining files to check
|
||||
remaining = [(key, size) for key, size in files if key != test_key]
|
||||
|
||||
if remaining:
|
||||
if len(remaining) > 10:
|
||||
print(
|
||||
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
for key, size in remaining:
|
||||
file_error = check_read_permission(s3, S3_BUCKET, key)
|
||||
if file_error is None:
|
||||
accessible_count += 1
|
||||
accessible_size += size
|
||||
else:
|
||||
dir_exceptions.append(
|
||||
ExceptionFile(prefix=prefix, key=key, error=file_error)
|
||||
)
|
||||
|
||||
else:
|
||||
# Only one file and it passed
|
||||
accessible_size = total_size
|
||||
|
||||
results.available.append(
|
||||
AvailableDir(
|
||||
prefix=prefix,
|
||||
file_count=file_count,
|
||||
total_size=total_size,
|
||||
accessible_count=accessible_count,
|
||||
total_count=file_count,
|
||||
accessible_size=accessible_size,
|
||||
)
|
||||
)
|
||||
results.exceptions.extend(dir_exceptions)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def print_results(results: Results, *, extensions: Set[str] | None = None) -> None:
|
||||
"""Print a clean, human-readable summary to stdout.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
results:
|
||||
The exploration results to display.
|
||||
extensions:
|
||||
The set of extensions that were used for filtering. Used only for
|
||||
labelling in the output. Defaults to ``FILE_EXTENSIONS``.
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = FILE_EXTENSIONS
|
||||
ext_label = extensions_label(extensions)
|
||||
|
||||
print()
|
||||
print("=== S3 Directory Explorer Results ===")
|
||||
print(f"Bucket: {S3_BUCKET}")
|
||||
print(f"Extensions: {ext_label}")
|
||||
|
||||
# --- Available ---
|
||||
print()
|
||||
print(f"--- Available ({len(results.available)}) ---")
|
||||
if results.available:
|
||||
for d in results.available:
|
||||
print(f" {d.prefix}")
|
||||
print(
|
||||
f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible"
|
||||
f" | Total Size: {format_size(d.accessible_size)}"
|
||||
)
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Blocked ---
|
||||
print()
|
||||
print(f"--- Blocked ({len(results.blocked)}) ---")
|
||||
if results.blocked:
|
||||
for d in results.blocked:
|
||||
if d.file_count:
|
||||
print(f" {d.prefix}")
|
||||
print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}")
|
||||
else:
|
||||
print(f" {d.prefix}")
|
||||
print(f" Error: {d.error}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Exceptions ---
|
||||
print()
|
||||
print(f"--- Exceptions ({len(results.exceptions)}) ---")
|
||||
if results.exceptions:
|
||||
for exc in results.exceptions:
|
||||
print(f" {exc.key}")
|
||||
print(f" Directory: {exc.prefix} | Error: {exc.error}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Empty ---
|
||||
print()
|
||||
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
|
||||
if results.empty:
|
||||
for d in results.empty:
|
||||
print(f" {d.prefix}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI argument parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def build_arg_parser() -> argparse.ArgumentParser:
|
||||
"""Build and return the CLI argument parser.
|
||||
|
||||
Supports selecting file-type filters, text-file reading parameters, and
|
||||
overriding the default bucket / profile / input-file settings.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Explore S3 directories and categorise them by accessibility. "
|
||||
"Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text "
|
||||
"files (.txt, .csv, .tsv)."
|
||||
),
|
||||
)
|
||||
|
||||
# --- File-type / extension selection ---
|
||||
type_group = parser.add_argument_group("File-type selection")
|
||||
type_group.add_argument(
|
||||
"--file-type",
|
||||
choices=["sas", "text", "all"],
|
||||
default="all",
|
||||
help=(
|
||||
"Restrict the scan to a specific file type. "
|
||||
"'sas' = .sas7bdat/.xpt/.xport only; "
|
||||
"'text' = .txt/.csv/.tsv only; "
|
||||
"'all' = both (default)."
|
||||
),
|
||||
)
|
||||
type_group.add_argument(
|
||||
"--extensions",
|
||||
nargs="+",
|
||||
metavar="EXT",
|
||||
help=(
|
||||
"Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). "
|
||||
"Overrides --file-type when provided."
|
||||
),
|
||||
)
|
||||
|
||||
# --- Text-file reading parameters ---
|
||||
text_group = parser.add_argument_group(
|
||||
"Text-file parameters",
|
||||
description=(
|
||||
"Parameters used when reading delimited text files. These are "
|
||||
"stored for downstream consumers and do not affect the S3 scan "
|
||||
"itself."
|
||||
),
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--delimiter",
|
||||
default=None,
|
||||
help=(
|
||||
"Field delimiter for text files (default: ',' for .csv/.txt, "
|
||||
"'\\t' for .tsv). Use 'tab' or '\\t' for a tab character."
|
||||
),
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--encoding",
|
||||
default=DEFAULT_ENCODING,
|
||||
help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).",
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--quotechar",
|
||||
default=DEFAULT_QUOTECHAR,
|
||||
help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).",
|
||||
)
|
||||
|
||||
# --- S3 / general settings ---
|
||||
s3_group = parser.add_argument_group("S3 settings")
|
||||
s3_group.add_argument(
|
||||
"--bucket",
|
||||
default=None,
|
||||
help=f"S3 bucket name (default: {S3_BUCKET}).",
|
||||
)
|
||||
s3_group.add_argument(
|
||||
"--profile",
|
||||
default=None,
|
||||
help=f"AWS CLI profile name (default: {AWS_PROFILE}).",
|
||||
)
|
||||
s3_group.add_argument(
|
||||
"--input-file",
|
||||
default=None,
|
||||
help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).",
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def resolve_extensions(args: argparse.Namespace) -> Set[str]:
|
||||
"""Determine the active extension set from parsed CLI *args*.
|
||||
|
||||
If ``--extensions`` is provided it takes precedence. Otherwise
|
||||
``--file-type`` is used to select a predefined set.
|
||||
"""
|
||||
if args.extensions:
|
||||
# Normalise: ensure each extension starts with a dot and is lowercase
|
||||
exts: Set[str] = set()
|
||||
for ext in args.extensions:
|
||||
ext = ext.strip().lower()
|
||||
if not ext.startswith("."):
|
||||
ext = "." + ext
|
||||
exts.add(ext)
|
||||
return exts
|
||||
|
||||
if args.file_type == "sas":
|
||||
return SAS_EXTENSIONS
|
||||
if args.file_type == "text":
|
||||
return TEXT_EXTENSIONS
|
||||
return SUPPORTED_EXTENSIONS
|
||||
|
||||
|
||||
def resolve_delimiter(args: argparse.Namespace) -> str:
|
||||
"""Return the effective delimiter from parsed CLI *args*.
|
||||
|
||||
Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a
|
||||
tab character on the command line without shell-escaping issues.
|
||||
"""
|
||||
if args.delimiter is None:
|
||||
return DEFAULT_DELIMITER
|
||||
raw = args.delimiter
|
||||
if raw.lower() in ("tab", "\\t"):
|
||||
return "\t"
|
||||
return raw
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = build_arg_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# --- Apply CLI overrides to module-level config ---------------------------
|
||||
if args.bucket:
|
||||
S3_BUCKET = args.bucket
|
||||
if args.profile:
|
||||
AWS_PROFILE = args.profile
|
||||
input_file = args.input_file if args.input_file else INPUT_FILE
|
||||
|
||||
active_extensions = resolve_extensions(args)
|
||||
FILE_EXTENSIONS = active_extensions
|
||||
|
||||
delimiter = resolve_delimiter(args)
|
||||
encoding = args.encoding
|
||||
quotechar = args.quotechar
|
||||
|
||||
# --- Read input file ------------------------------------------------------
|
||||
if not os.path.exists(input_file):
|
||||
print(f"ERROR: Input file not found: {input_file}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
prefixes = read_input_file(input_file)
|
||||
except Exception as exc:
|
||||
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not prefixes:
|
||||
print("No valid S3 prefixes found in the input file. Nothing to do.")
|
||||
sys.exit(0)
|
||||
|
||||
# --- Validate AWS profile -------------------------------------------------
|
||||
try:
|
||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
||||
# Force credential resolution to catch bad profiles early
|
||||
credentials = session.get_credentials()
|
||||
if credentials is None:
|
||||
raise RuntimeError(
|
||||
f"No credentials found for AWS profile {AWS_PROFILE!r}"
|
||||
)
|
||||
except botocore.exceptions.ProfileNotFound as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except Exception as exc:
|
||||
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# --- Print active configuration -------------------------------------------
|
||||
ext_label = extensions_label(active_extensions)
|
||||
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
|
||||
print(f"Extensions: {ext_label}", file=sys.stderr)
|
||||
if active_extensions & TEXT_EXTENSIONS:
|
||||
print(
|
||||
f"Text opts: delimiter={delimiter!r} encoding={encoding!r} "
|
||||
f"quotechar={quotechar!r}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# --- Explore --------------------------------------------------------------
|
||||
results = explore_directories(prefixes, extensions=active_extensions)
|
||||
print_results(results, extensions=active_extensions)
|
||||
@ -1,374 +0,0 @@
|
||||
"""Standalone utility to download a SAS or delimited text file from S3 and
|
||||
print a column-level summary of the first *N* rows.
|
||||
|
||||
Supported formats
|
||||
-----------------
|
||||
* **SAS** – ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*)
|
||||
* **Text** – ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*)
|
||||
|
||||
Configure the four constants below **or** use the CLI arguments, then run::
|
||||
|
||||
python3 file_viewer.py
|
||||
python3 file_viewer.py --local path/to/file.csv
|
||||
python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t'
|
||||
|
||||
Python 3.14 compatible.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
import boto3
|
||||
import pandas as pd
|
||||
import pyreadstat
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Supported file extensions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"}
|
||||
"""File extensions recognised as SAS data files."""
|
||||
|
||||
TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"}
|
||||
"""File extensions recognised as delimited text files."""
|
||||
|
||||
SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
||||
"""Union of all supported file extensions."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration — edit these before running (or use CLI arguments)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
||||
"""S3 bucket name."""
|
||||
|
||||
S3_KEY: str = "path/to/file.sas7bdat"
|
||||
"""Object key (path) within the bucket to a supported data file."""
|
||||
|
||||
LOCAL_FOLDER: str = "./downloads"
|
||||
"""Local directory to download the file into."""
|
||||
|
||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
||||
"""AWS CLI profile name used for authentication."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None:
|
||||
"""Download *key* from *bucket* to *local_path*, skipping if already present.
|
||||
|
||||
If *local_path* exists and its size matches the S3 object's size, the
|
||||
download is skipped and a message is printed.
|
||||
|
||||
Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`.
|
||||
"""
|
||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
||||
s3 = session.client("s3")
|
||||
|
||||
remote_size = s3.head_object(Bucket=bucket, Key=key)["ContentLength"]
|
||||
|
||||
if os.path.exists(local_path):
|
||||
local_size = os.path.getsize(local_path)
|
||||
if local_size == remote_size:
|
||||
print(
|
||||
f"Local file {local_path} already matches s3://{bucket}/{key} "
|
||||
f"({local_size} bytes); skipping download."
|
||||
)
|
||||
return
|
||||
print(
|
||||
f"Local file {local_path} size ({local_size} bytes) differs from "
|
||||
f"S3 ({remote_size} bytes); re-downloading."
|
||||
)
|
||||
|
||||
print(f"Downloading s3://{bucket}/{key} -> {local_path}")
|
||||
s3.download_file(bucket, key, local_path)
|
||||
print("Download complete.")
|
||||
|
||||
|
||||
# -- SAS readers -------------------------------------------------------------
|
||||
|
||||
|
||||
def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame:
|
||||
"""Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``)."""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
if ext == ".sas7bdat":
|
||||
df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
|
||||
elif ext in {".xpt", ".xport"}:
|
||||
df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count)
|
||||
else:
|
||||
raise ValueError(f"Unsupported SAS extension: {ext}")
|
||||
return df
|
||||
|
||||
|
||||
# -- Text readers ------------------------------------------------------------
|
||||
|
||||
|
||||
def _read_text_head(
|
||||
path: str,
|
||||
row_count: int = 10,
|
||||
delimiter: str = ",",
|
||||
encoding: str = "utf-8",
|
||||
quotechar: str = '"',
|
||||
) -> pd.DataFrame:
|
||||
"""Read the first *row_count* rows of a delimited text file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : str
|
||||
Path to the ``.csv``, ``.tsv``, or ``.txt`` file.
|
||||
row_count : int, optional
|
||||
Number of data rows to read (default ``10``).
|
||||
delimiter : str, optional
|
||||
Column delimiter (default ``","``). For ``.tsv`` files the caller
|
||||
should pass ``"\\t"``.
|
||||
encoding : str, optional
|
||||
File encoding (default ``"utf-8"``).
|
||||
quotechar : str, optional
|
||||
Character used to quote fields (default ``'"'``).
|
||||
"""
|
||||
return pd.read_csv(
|
||||
path,
|
||||
sep=delimiter,
|
||||
encoding=encoding,
|
||||
quotechar=quotechar,
|
||||
nrows=row_count,
|
||||
)
|
||||
|
||||
|
||||
# -- Unified reader ----------------------------------------------------------
|
||||
|
||||
|
||||
def _read_head(
|
||||
path: str,
|
||||
row_count: int = 10,
|
||||
delimiter: str | None = None,
|
||||
encoding: str = "utf-8",
|
||||
quotechar: str = '"',
|
||||
) -> pd.DataFrame:
|
||||
"""Read the first *row_count* rows of a supported data file.
|
||||
|
||||
Auto-detects the file type from its extension and delegates to the
|
||||
appropriate reader. For ``.tsv`` files the delimiter defaults to tab
|
||||
(``"\\t"``); for other text files it defaults to ``","``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : str
|
||||
Path to the data file.
|
||||
row_count : int, optional
|
||||
Number of data rows to read (default ``10``).
|
||||
delimiter : str or None, optional
|
||||
Column delimiter for text files. ``None`` means *auto-detect*
|
||||
(tab for ``.tsv``, comma otherwise).
|
||||
encoding : str, optional
|
||||
Encoding for text files (default ``"utf-8"``).
|
||||
quotechar : str, optional
|
||||
Quote character for text files (default ``'"'``).
|
||||
|
||||
Returns
|
||||
-------
|
||||
pandas.DataFrame
|
||||
"""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
|
||||
if ext not in SUPPORTED_EXTENSIONS:
|
||||
raise ValueError(
|
||||
f"Unsupported file extension '{ext}'. "
|
||||
f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}"
|
||||
)
|
||||
|
||||
if ext in SAS_EXTENSIONS:
|
||||
return _read_sas_head(path, row_count=row_count)
|
||||
|
||||
# --- Text file path ---
|
||||
if delimiter is None:
|
||||
delimiter = "\t" if ext == ".tsv" else ","
|
||||
|
||||
return _read_text_head(
|
||||
path,
|
||||
row_count=row_count,
|
||||
delimiter=delimiter,
|
||||
encoding=encoding,
|
||||
quotechar=quotechar,
|
||||
)
|
||||
|
||||
|
||||
# -- Display -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _sample_values(series: pd.Series, n: int = 3) -> str:
|
||||
"""Return up to *n* non-null sample values as a comma-separated string."""
|
||||
non_null = series.dropna()
|
||||
samples = non_null.head(n).tolist()
|
||||
if not samples:
|
||||
return "(all null)"
|
||||
return ", ".join(repr(v) for v in samples)
|
||||
|
||||
|
||||
def _print_summary(df: pd.DataFrame) -> None:
|
||||
"""Print a nicely formatted summary table to stdout."""
|
||||
# Pre-compute column data
|
||||
rows = []
|
||||
for col in df.columns:
|
||||
rows.append((col, str(df[col].dtype), _sample_values(df[col], 3)))
|
||||
|
||||
# Determine column widths
|
||||
hdr_name = "Column Name"
|
||||
hdr_dtype = "Data Type"
|
||||
hdr_samples = "Sample Values (up to 3)"
|
||||
|
||||
w_name = max(len(hdr_name), *(len(r[0]) for r in rows))
|
||||
w_dtype = max(len(hdr_dtype), *(len(r[1]) for r in rows))
|
||||
w_samples = max(len(hdr_samples), *(len(r[2]) for r in rows))
|
||||
|
||||
fmt = f" {{:<{w_name}}} {{:<{w_dtype}}} {{:<{w_samples}}}"
|
||||
sep = f" {'-' * w_name} {'-' * w_dtype} {'-' * w_samples}"
|
||||
|
||||
print()
|
||||
print(f" Summary of first {len(df)} row(s) ({len(df.columns)} columns)")
|
||||
print(sep)
|
||||
print(fmt.format(hdr_name, hdr_dtype, hdr_samples))
|
||||
print(sep)
|
||||
for name, dtype, samples in rows:
|
||||
print(fmt.format(name, dtype, samples))
|
||||
print(sep)
|
||||
print()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_parser() -> argparse.ArgumentParser:
|
||||
"""Build the argument parser for the file-viewer CLI."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Download a SAS or delimited text file from S3 (or read a local "
|
||||
"file) and print a column-level summary of the first N rows.\n\n"
|
||||
"Supported extensions: "
|
||||
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
|
||||
),
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
|
||||
source = parser.add_mutually_exclusive_group()
|
||||
source.add_argument(
|
||||
"--local",
|
||||
metavar="FILE",
|
||||
default=None,
|
||||
help=(
|
||||
"Path to a local data file to summarise (skips S3 download). "
|
||||
"Supported extensions: "
|
||||
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
|
||||
),
|
||||
)
|
||||
source.add_argument(
|
||||
"--s3-key",
|
||||
metavar="KEY",
|
||||
default=None,
|
||||
help="Override the S3_KEY constant with this object key.",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--rows",
|
||||
type=int,
|
||||
default=10,
|
||||
metavar="N",
|
||||
help="Number of rows to read (default: 10).",
|
||||
)
|
||||
|
||||
# Text-file-specific options
|
||||
text_group = parser.add_argument_group(
|
||||
"text file options",
|
||||
"These options apply only to .csv / .tsv / .txt files.",
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--delimiter",
|
||||
default=None,
|
||||
help=(
|
||||
'Column delimiter for text files (default: "," for .csv/.txt, '
|
||||
'"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.'
|
||||
),
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--encoding",
|
||||
default="utf-8",
|
||||
help='File encoding for text files (default: "utf-8").',
|
||||
)
|
||||
text_group.add_argument(
|
||||
"--quotechar",
|
||||
default='"',
|
||||
help='Quote character for text files (default: \'"\').',
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = _build_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.local:
|
||||
# ---- Local file mode -----------------------------------------------
|
||||
local_path = args.local
|
||||
ext = os.path.splitext(local_path)[1].lower()
|
||||
if ext not in SUPPORTED_EXTENSIONS:
|
||||
parser.error(
|
||||
f"Unsupported file extension '{ext}'. "
|
||||
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
|
||||
)
|
||||
if not os.path.isfile(local_path):
|
||||
print(f"File not found: {local_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
else:
|
||||
# ---- S3 download mode ----------------------------------------------
|
||||
s3_key = args.s3_key or S3_KEY
|
||||
ext = os.path.splitext(s3_key)[1].lower()
|
||||
if ext not in SUPPORTED_EXTENSIONS:
|
||||
parser.error(
|
||||
f"Unsupported file extension '{ext}' in S3 key. "
|
||||
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
|
||||
)
|
||||
|
||||
os.makedirs(LOCAL_FOLDER, exist_ok=True)
|
||||
local_filename = os.path.basename(s3_key)
|
||||
local_path = os.path.join(LOCAL_FOLDER, local_filename)
|
||||
|
||||
try:
|
||||
_ensure_local_copy(S3_BUCKET, s3_key, local_path)
|
||||
except Exception as exc:
|
||||
print(f"S3 download error: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# ---- Read & summarise --------------------------------------------------
|
||||
try:
|
||||
df = _read_head(
|
||||
local_path,
|
||||
row_count=args.rows,
|
||||
delimiter=args.delimiter,
|
||||
encoding=args.encoding,
|
||||
quotechar=args.quotechar,
|
||||
)
|
||||
except Exception as exc:
|
||||
print(f"File read error: {exc}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
_print_summary(df)
|
||||
@ -1,757 +0,0 @@
|
||||
"""S3-source counterpart to ``generic_loader/load_folder.py``.
|
||||
|
||||
Reads a YAML config that points at an S3 bucket + prefix, lists every object
|
||||
under that prefix recursively, groups objects into *clusters* using the same
|
||||
explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each
|
||||
cluster's files into its own subfolder under a local destination root.
|
||||
|
||||
Supported file types:
|
||||
* SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport``
|
||||
* Delimited text files: ``.txt``, ``.csv``, ``.tsv``
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
USAGE
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
1. YAML config
|
||||
--------------
|
||||
::
|
||||
|
||||
bucket: my-bucket # required
|
||||
prefix: census/2020/raw/ # required; recursive scan under it
|
||||
local_folder: ./downloads # required; one subfolder per cluster
|
||||
aws_profile: default # optional; default boto3 chain if omitted
|
||||
|
||||
auto_detect: true # optional; default true
|
||||
extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv
|
||||
- .sas7bdat
|
||||
- .csv
|
||||
on_exists: skip # optional; skip | overwrite | error
|
||||
concurrency: 4 # optional; default 4
|
||||
|
||||
clusters:
|
||||
- pattern: '^group_a\\d+\\.sas7bdat$'
|
||||
name: group_a
|
||||
- pattern: '^group_b\\d+\\.sas7bdat$'
|
||||
name: group_b
|
||||
|
||||
2. Command-line interface
|
||||
-------------------------
|
||||
::
|
||||
|
||||
python s3_download.py --config download_config.yaml [--dry-run]
|
||||
[--overwrite] [--fail-fast]
|
||||
|
||||
Flags:
|
||||
--config PATH Required. Path to the YAML config above.
|
||||
--dry-run List discovered clusters and the objects each would
|
||||
download (with sizes). No GET requests are issued
|
||||
beyond the initial LIST.
|
||||
--overwrite Force-redownload every matched object regardless of
|
||||
local cache state. Equivalent to ``on_exists: overwrite``
|
||||
for this run.
|
||||
--fail-fast Abort the whole run on the first per-file download
|
||||
failure. Default is to log the failure and keep going.
|
||||
|
||||
Exit codes:
|
||||
0 - every file downloaded (or skipped) successfully (or dry-run completed)
|
||||
1 - at least one file failed (details on stderr)
|
||||
2 - the LIST returned no objects matching the configured extensions
|
||||
|
||||
3. Discovery rules
|
||||
------------------
|
||||
* Listing is recursive (no S3 ``Delimiter``). Regexes are matched against
|
||||
the *basename* of each key (the part after the last ``/``), so a nested
|
||||
object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by
|
||||
``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled
|
||||
identically — the basename is extracted and matched the same way.
|
||||
* Explicit patterns are tried in order. A key matched by one pattern is
|
||||
removed from the pool before the next pattern runs. Overlap between
|
||||
patterns is flagged as an error at discovery time.
|
||||
* Auto-detect groups remaining keys by ``re.sub(r'\\d+$', '', stem)`` with
|
||||
any trailing ``_`` / ``-`` stripped afterward, mirroring
|
||||
``load_folder.py``. Stems without trailing digits become singleton
|
||||
clusters named after the stem.
|
||||
* Within a cluster, files are sorted numerically by the LAST digit group
|
||||
in the stem so ``_9_`` sorts before ``_40_`` regardless of zero-padding.
|
||||
|
||||
4. Library usage
|
||||
----------------
|
||||
::
|
||||
|
||||
from s3_download import load_download_config, list_s3_objects, \
|
||||
discover_clusters, download_cluster, build_s3_client
|
||||
|
||||
cfg = load_download_config("download_config.yaml")
|
||||
s3 = build_s3_client(cfg)
|
||||
objects = list_s3_objects(s3, cfg)
|
||||
for cluster in discover_clusters(cfg, objects):
|
||||
download_cluster(s3, cfg, cluster)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
|
||||
import boto3
|
||||
import yaml
|
||||
|
||||
|
||||
SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
|
||||
TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv")
|
||||
DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS
|
||||
VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error")
|
||||
DEFAULT_CONCURRENCY: int = 4
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dataclasses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class _ExplicitPattern:
|
||||
"""Parsed form of a single ``clusters[*]`` YAML entry."""
|
||||
|
||||
pattern: re.Pattern
|
||||
raw_pattern: str
|
||||
name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class DownloadConfig:
|
||||
"""Top-level configuration parsed from YAML."""
|
||||
|
||||
bucket: str
|
||||
prefix: str
|
||||
local_folder: Path
|
||||
aws_profile: Optional[str] = None
|
||||
auto_detect: bool = True
|
||||
extensions: Tuple[str, ...] = DEFAULT_EXTENSIONS
|
||||
on_exists: str = "skip"
|
||||
concurrency: int = DEFAULT_CONCURRENCY
|
||||
explicit: List[_ExplicitPattern] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class S3Object:
|
||||
"""A single S3 object selected from the LIST response."""
|
||||
|
||||
key: str
|
||||
basename: str
|
||||
size: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClusterSpec:
|
||||
"""Resolved per-cluster download settings."""
|
||||
|
||||
name: str
|
||||
objects: List[S3Object]
|
||||
source: str # "explicit" or "auto"
|
||||
pattern: Optional[str] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _validate_on_exists(value: Any, where: str) -> str:
|
||||
s = str(value).lower()
|
||||
if s not in VALID_ON_EXISTS:
|
||||
raise ValueError(
|
||||
f"{where}: on_exists={value!r} is not one of {list(VALID_ON_EXISTS)}"
|
||||
)
|
||||
return s
|
||||
|
||||
|
||||
def _parse_extensions(raw_value: Any, where: str) -> Tuple[str, ...]:
|
||||
if raw_value is None:
|
||||
return DEFAULT_EXTENSIONS
|
||||
if isinstance(raw_value, str):
|
||||
items = [raw_value]
|
||||
elif isinstance(raw_value, list):
|
||||
items = list(raw_value)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"{where}: 'extensions' must be a string or list of strings."
|
||||
)
|
||||
out: List[str] = []
|
||||
for i, item in enumerate(items):
|
||||
if not isinstance(item, str) or not item.strip():
|
||||
raise ValueError(
|
||||
f"{where}: 'extensions[{i}]' must be a non-empty string."
|
||||
)
|
||||
ext = item.strip().lower()
|
||||
if not ext.startswith("."):
|
||||
ext = "." + ext
|
||||
out.append(ext)
|
||||
if len(out) != len(set(out)):
|
||||
raise ValueError(f"{where}: 'extensions' contains duplicates.")
|
||||
return tuple(out)
|
||||
|
||||
|
||||
def _parse_concurrency(raw_value: Any, where: str) -> int:
|
||||
if raw_value is None:
|
||||
return DEFAULT_CONCURRENCY
|
||||
try:
|
||||
value = int(raw_value)
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError(
|
||||
f"{where}: 'concurrency' must be a positive integer, "
|
||||
f"got {raw_value!r}"
|
||||
)
|
||||
if value <= 0:
|
||||
raise ValueError(
|
||||
f"{where}: 'concurrency' must be a positive integer, got {value}"
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
def load_download_config(path: Path) -> DownloadConfig:
|
||||
"""Parse and validate the YAML download config at ``path``."""
|
||||
path = Path(path)
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
raw = yaml.safe_load(f)
|
||||
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError(
|
||||
f"Config at {path} must be a YAML mapping at the top level."
|
||||
)
|
||||
|
||||
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
|
||||
# missing when neither the YAML key nor the env var is present.
|
||||
required_always = ("prefix", "local_folder")
|
||||
missing = [k for k in required_always if k not in raw]
|
||||
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
|
||||
missing.insert(0, "bucket")
|
||||
if missing:
|
||||
raise ValueError(
|
||||
f"Config {path} missing required keys: {', '.join(missing)}"
|
||||
)
|
||||
|
||||
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
|
||||
if not bucket:
|
||||
bucket = os.environ.get("S3_BUCKET", "")
|
||||
if not bucket:
|
||||
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
|
||||
|
||||
prefix = str(raw["prefix"])
|
||||
# Normalize: strip leading slash, ensure exactly one trailing slash unless
|
||||
# the user explicitly asked for an empty prefix (whole bucket scan).
|
||||
prefix = prefix.lstrip("/")
|
||||
if prefix and not prefix.endswith("/"):
|
||||
prefix = prefix + "/"
|
||||
|
||||
local_folder = Path(raw["local_folder"])
|
||||
if not local_folder.is_absolute():
|
||||
candidate = (path.parent / local_folder).resolve()
|
||||
# Mirror load_folder's behavior: prefer the config-relative path when
|
||||
# it exists, otherwise keep what the user wrote. Either way, we'll
|
||||
# mkdir(parents=True) before downloading so non-existence is fine.
|
||||
local_folder = candidate if candidate.parent.exists() else candidate
|
||||
|
||||
aws_profile = raw.get("aws_profile")
|
||||
if aws_profile is not None:
|
||||
aws_profile = str(aws_profile).strip() or None
|
||||
if aws_profile is None:
|
||||
aws_profile = os.environ.get("AWS_PROFILE") or None
|
||||
|
||||
auto_detect = bool(raw.get("auto_detect", True))
|
||||
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")
|
||||
on_exists = _validate_on_exists(
|
||||
raw.get("on_exists", "skip"), f"Config {path}"
|
||||
)
|
||||
concurrency = _parse_concurrency(
|
||||
raw.get("concurrency"), f"Config {path}"
|
||||
)
|
||||
|
||||
explicit: List[_ExplicitPattern] = []
|
||||
seen_names: set = set()
|
||||
clusters_raw = raw.get("clusters") or []
|
||||
if not isinstance(clusters_raw, list):
|
||||
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
|
||||
for i, entry in enumerate(clusters_raw):
|
||||
where = f"Config {path} clusters[{i}]"
|
||||
if not isinstance(entry, dict):
|
||||
raise ValueError(f"{where} must be a mapping.")
|
||||
if "pattern" not in entry or "name" not in entry:
|
||||
raise ValueError(f"{where} must include 'pattern' and 'name'.")
|
||||
raw_pat = str(entry["pattern"])
|
||||
try:
|
||||
compiled = re.compile(raw_pat)
|
||||
except re.error as e:
|
||||
raise ValueError(
|
||||
f"{where}: invalid regex {raw_pat!r}: {e}"
|
||||
) from e
|
||||
name = str(entry["name"]).strip()
|
||||
if not name:
|
||||
raise ValueError(f"{where}: 'name' must be a non-empty string.")
|
||||
if name in seen_names:
|
||||
raise ValueError(
|
||||
f"{where}: duplicate cluster name {name!r}"
|
||||
)
|
||||
seen_names.add(name)
|
||||
explicit.append(
|
||||
_ExplicitPattern(
|
||||
pattern=compiled, raw_pattern=raw_pat, name=name,
|
||||
)
|
||||
)
|
||||
|
||||
return DownloadConfig(
|
||||
bucket=bucket,
|
||||
prefix=prefix,
|
||||
local_folder=local_folder,
|
||||
aws_profile=aws_profile,
|
||||
auto_detect=auto_detect,
|
||||
extensions=extensions,
|
||||
on_exists=on_exists,
|
||||
concurrency=concurrency,
|
||||
explicit=explicit,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# S3 listing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def build_s3_client(cfg: DownloadConfig):
|
||||
"""Build a boto3 S3 client honoring ``cfg.aws_profile`` if set."""
|
||||
if cfg.aws_profile:
|
||||
session = boto3.Session(profile_name=cfg.aws_profile)
|
||||
else:
|
||||
session = boto3.Session()
|
||||
return session.client("s3")
|
||||
|
||||
|
||||
def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]:
|
||||
"""List all objects under ``cfg.prefix`` recursively, filtered by extension.
|
||||
|
||||
Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text
|
||||
extensions (``.txt``, ``.csv``, ``.tsv``) — whichever are present in
|
||||
``cfg.extensions``.
|
||||
"""
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
out: List[S3Object] = []
|
||||
for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix):
|
||||
for entry in page.get("Contents", []):
|
||||
key = entry["Key"]
|
||||
if key.endswith("/"):
|
||||
continue
|
||||
basename = key.rsplit("/", 1)[-1]
|
||||
ext = ("." + basename.rsplit(".", 1)[-1].lower()
|
||||
if "." in basename else "")
|
||||
if ext not in cfg.extensions:
|
||||
continue
|
||||
out.append(
|
||||
S3Object(key=key, basename=basename, size=int(entry["Size"]))
|
||||
)
|
||||
out.sort(key=lambda o: o.key)
|
||||
return out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cluster discovery (mirrors generic_loader/load_folder.py)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
|
||||
_DIGIT_GROUP_RE = re.compile(r"\d+")
|
||||
|
||||
|
||||
def _auto_prefix(stem: str) -> str:
|
||||
"""Cluster key for *stem*: strip trailing digits and any trailing _/-."""
|
||||
stripped = _TRAILING_DIGIT_RE.sub("", stem)
|
||||
stripped = stripped.rstrip("_-")
|
||||
return stripped or stem
|
||||
|
||||
|
||||
def _basename_stem(basename: str) -> str:
|
||||
if "." in basename:
|
||||
return basename.rsplit(".", 1)[0]
|
||||
return basename
|
||||
|
||||
|
||||
def _cluster_sort_key(obj: S3Object) -> Tuple[int, str]:
|
||||
"""Sort key for ordering objects within a cluster by trailing digits."""
|
||||
stem = _basename_stem(obj.basename)
|
||||
digits = _DIGIT_GROUP_RE.findall(stem)
|
||||
n = int(digits[-1]) if digits else -1
|
||||
return (n, stem)
|
||||
|
||||
|
||||
def discover_clusters(
|
||||
cfg: DownloadConfig, objects: List[S3Object]
|
||||
) -> List[ClusterSpec]:
|
||||
"""Bucket *objects* into clusters using explicit patterns then auto-detect."""
|
||||
clusters: List[ClusterSpec] = []
|
||||
|
||||
for i, p_i in enumerate(cfg.explicit):
|
||||
for j in range(i + 1, len(cfg.explicit)):
|
||||
p_j = cfg.explicit[j]
|
||||
for obj in objects:
|
||||
if (p_i.pattern.search(obj.basename)
|
||||
and p_j.pattern.search(obj.basename)):
|
||||
raise ValueError(
|
||||
f"Object {obj.basename!r} matches multiple explicit "
|
||||
f"patterns: {p_i.raw_pattern!r} and "
|
||||
f"{p_j.raw_pattern!r}"
|
||||
)
|
||||
|
||||
remaining = list(objects)
|
||||
for patt in cfg.explicit:
|
||||
matched = [o for o in remaining if patt.pattern.search(o.basename)]
|
||||
if not matched:
|
||||
clusters.append(
|
||||
ClusterSpec(
|
||||
name=patt.name,
|
||||
objects=[],
|
||||
source="explicit",
|
||||
pattern=patt.raw_pattern,
|
||||
)
|
||||
)
|
||||
continue
|
||||
remaining = [o for o in remaining if o not in matched]
|
||||
clusters.append(
|
||||
ClusterSpec(
|
||||
name=patt.name,
|
||||
objects=sorted(matched, key=_cluster_sort_key),
|
||||
source="explicit",
|
||||
pattern=patt.raw_pattern,
|
||||
)
|
||||
)
|
||||
|
||||
if cfg.auto_detect and remaining:
|
||||
buckets: Dict[str, List[S3Object]] = {}
|
||||
for obj in remaining:
|
||||
key = _auto_prefix(_basename_stem(obj.basename))
|
||||
buckets.setdefault(key, []).append(obj)
|
||||
for key in sorted(buckets):
|
||||
clusters.append(
|
||||
ClusterSpec(
|
||||
name=key,
|
||||
objects=sorted(buckets[key], key=_cluster_sort_key),
|
||||
source="auto",
|
||||
)
|
||||
)
|
||||
|
||||
return clusters
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Download
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _local_path(
|
||||
cfg: DownloadConfig,
|
||||
cluster: ClusterSpec,
|
||||
obj: S3Object,
|
||||
basename_collisions: set,
|
||||
) -> Path:
|
||||
"""Resolve the on-disk destination path for *obj*.
|
||||
|
||||
Falls back to a key-derived filename when two objects in the same cluster
|
||||
share a basename (possible under recursive scan).
|
||||
"""
|
||||
cluster_dir = cfg.local_folder / cluster.name
|
||||
if obj.basename in basename_collisions:
|
||||
safe = obj.key.replace("/", "__")
|
||||
return cluster_dir / safe
|
||||
return cluster_dir / obj.basename
|
||||
|
||||
|
||||
def _basename_collisions(cluster: ClusterSpec) -> set:
|
||||
"""Return the set of basenames that appear more than once in *cluster*."""
|
||||
seen: Dict[str, int] = {}
|
||||
for obj in cluster.objects:
|
||||
seen[obj.basename] = seen.get(obj.basename, 0) + 1
|
||||
return {name for name, count in seen.items() if count > 1}
|
||||
|
||||
|
||||
def _decide_action(
|
||||
local_path: Path, obj: S3Object, on_exists: str
|
||||
) -> Tuple[str, Optional[str]]:
|
||||
"""Return ``(action, message)`` where action is 'download' or 'skip'."""
|
||||
if on_exists == "overwrite":
|
||||
return ("download", None)
|
||||
if not local_path.exists():
|
||||
return ("download", None)
|
||||
local_size = local_path.stat().st_size
|
||||
if local_size == obj.size:
|
||||
return (
|
||||
"skip",
|
||||
f" skip {obj.key} -> {local_path} (size {local_size} matches)",
|
||||
)
|
||||
if on_exists == "error":
|
||||
raise RuntimeError(
|
||||
f"Local file {local_path} exists with size {local_size} but S3 "
|
||||
f"object s3://{obj.key} has size {obj.size} (on_exists=error)"
|
||||
)
|
||||
return (
|
||||
"download",
|
||||
f" re-download {obj.key} -> {local_path} "
|
||||
f"(local size {local_size} != S3 {obj.size})",
|
||||
)
|
||||
|
||||
|
||||
def download_cluster(
|
||||
s3_client,
|
||||
cfg: DownloadConfig,
|
||||
cluster: ClusterSpec,
|
||||
*,
|
||||
on_exists_override: Optional[str] = None,
|
||||
fail_fast: bool = False,
|
||||
) -> Tuple[int, int, int, List[Tuple[str, Exception]]]:
|
||||
"""Download every object in *cluster* into ``cfg.local_folder/cluster.name``.
|
||||
|
||||
Returns ``(downloaded, skipped, bytes_downloaded, failures)`` where
|
||||
*failures* is a list of ``(key, exception)`` tuples.
|
||||
"""
|
||||
if not cluster.objects:
|
||||
return (0, 0, 0, [])
|
||||
|
||||
on_exists = on_exists_override or cfg.on_exists
|
||||
cluster_dir = cfg.local_folder / cluster.name
|
||||
cluster_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
collisions = _basename_collisions(cluster)
|
||||
|
||||
plans: List[Tuple[S3Object, Path, str]] = []
|
||||
skipped = 0
|
||||
for obj in cluster.objects:
|
||||
local_path = _local_path(cfg, cluster, obj, collisions)
|
||||
action, message = _decide_action(local_path, obj, on_exists)
|
||||
if message:
|
||||
print(message, file=sys.stderr)
|
||||
if action == "skip":
|
||||
skipped += 1
|
||||
continue
|
||||
plans.append((obj, local_path, action))
|
||||
|
||||
downloaded = 0
|
||||
bytes_downloaded = 0
|
||||
failures: List[Tuple[str, Exception]] = []
|
||||
|
||||
if not plans:
|
||||
return (0, skipped, 0, failures)
|
||||
|
||||
def _do_one(item):
|
||||
obj, local_path, _action = item
|
||||
local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
s3_client.download_file(cfg.bucket, obj.key, str(local_path))
|
||||
return obj
|
||||
|
||||
if cfg.concurrency <= 1 or len(plans) == 1:
|
||||
for plan in plans:
|
||||
obj = plan[0]
|
||||
try:
|
||||
_do_one(plan)
|
||||
downloaded += 1
|
||||
bytes_downloaded += obj.size
|
||||
print(
|
||||
f" ok {obj.key} -> {plan[1]} ({obj.size:,} bytes)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
except Exception as exc:
|
||||
failures.append((obj.key, exc))
|
||||
print(
|
||||
f" FAIL {obj.key}: {exc}", file=sys.stderr,
|
||||
)
|
||||
if fail_fast:
|
||||
break
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=cfg.concurrency) as pool:
|
||||
future_to_plan = {pool.submit(_do_one, p): p for p in plans}
|
||||
for fut in as_completed(future_to_plan):
|
||||
plan = future_to_plan[fut]
|
||||
obj = plan[0]
|
||||
try:
|
||||
fut.result()
|
||||
downloaded += 1
|
||||
bytes_downloaded += obj.size
|
||||
print(
|
||||
f" ok {obj.key} -> {plan[1]} "
|
||||
f"({obj.size:,} bytes)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
except Exception as exc:
|
||||
failures.append((obj.key, exc))
|
||||
print(
|
||||
f" FAIL {obj.key}: {exc}", file=sys.stderr,
|
||||
)
|
||||
if fail_fast:
|
||||
for other in future_to_plan:
|
||||
other.cancel()
|
||||
break
|
||||
|
||||
return (downloaded, skipped, bytes_downloaded, failures)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_argparser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Download S3 objects (SAS data files and/or delimited text files) "
|
||||
"under a prefix into a local folder, grouping objects into "
|
||||
"clusters that each become one subfolder. "
|
||||
"Supported extensions: "
|
||||
+ ", ".join(DEFAULT_EXTENSIONS)
|
||||
+ "."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--config", required=True, type=Path, help="Path to YAML config",
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help=(
|
||||
"List discovered clusters and the objects each would download. "
|
||||
"No GET requests are issued beyond the initial LIST."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--overwrite",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Force-redownload every matched object regardless of local "
|
||||
"cache state. Equivalent to on_exists=overwrite."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--fail-fast",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Abort on the first per-file download failure. Default is to "
|
||||
"log the failure and keep going."
|
||||
),
|
||||
)
|
||||
return p
|
||||
|
||||
|
||||
def _format_bytes(n: int) -> str:
|
||||
units = ("B", "KiB", "MiB", "GiB", "TiB")
|
||||
size = float(n)
|
||||
for unit in units:
|
||||
if size < 1024 or unit == units[-1]:
|
||||
return f"{size:,.1f} {unit}" if unit != "B" else f"{int(size):,} B"
|
||||
size /= 1024
|
||||
return f"{n} B"
|
||||
|
||||
|
||||
def _describe_cluster(cluster: ClusterSpec) -> str:
|
||||
src = cluster.source
|
||||
if cluster.pattern:
|
||||
src += f" pattern={cluster.pattern!r}"
|
||||
if not cluster.objects:
|
||||
return (
|
||||
f"cluster {cluster.name!r} [{src}]\n objects: (no matching keys)"
|
||||
)
|
||||
total = sum(o.size for o in cluster.objects)
|
||||
lines = [
|
||||
f"cluster {cluster.name!r} [{src}] "
|
||||
f"{len(cluster.objects)} object(s), {_format_bytes(total)}"
|
||||
]
|
||||
for obj in cluster.objects:
|
||||
lines.append(f" - {obj.key} ({_format_bytes(obj.size)})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None) -> int:
|
||||
args = _build_argparser().parse_args(argv)
|
||||
|
||||
cfg = load_download_config(args.config)
|
||||
|
||||
s3 = build_s3_client(cfg)
|
||||
objects = list_s3_objects(s3, cfg)
|
||||
|
||||
if not objects:
|
||||
print(
|
||||
f"error: no objects matching extensions "
|
||||
f"{list(cfg.extensions)} found under "
|
||||
f"s3://{cfg.bucket}/{cfg.prefix}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 2
|
||||
|
||||
clusters = discover_clusters(cfg, objects)
|
||||
loadable = [c for c in clusters if c.objects]
|
||||
|
||||
print(
|
||||
f"discovered {len(loadable)} cluster(s) "
|
||||
f"({sum(len(c.objects) for c in loadable)} object(s)) under "
|
||||
f"s3://{cfg.bucket}/{cfg.prefix}:"
|
||||
)
|
||||
for c in clusters:
|
||||
print(_describe_cluster(c))
|
||||
|
||||
if args.dry_run:
|
||||
return 0
|
||||
|
||||
cfg.local_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
on_exists_override = "overwrite" if args.overwrite else None
|
||||
|
||||
totals: List[Tuple[str, int, int, int]] = [] # (name, dl, skip, bytes)
|
||||
failures: List[Tuple[str, str, Exception]] = [] # (cluster, key, exc)
|
||||
aborted = False
|
||||
for cluster in loadable:
|
||||
if aborted:
|
||||
break
|
||||
print(
|
||||
f"\n>>> downloading cluster {cluster.name!r} "
|
||||
f"({len(cluster.objects)} object(s))"
|
||||
)
|
||||
try:
|
||||
dl, sk, by, fails = download_cluster(
|
||||
s3, cfg, cluster,
|
||||
on_exists_override=on_exists_override,
|
||||
fail_fast=args.fail_fast,
|
||||
)
|
||||
except Exception as exc:
|
||||
failures.append((cluster.name, "<cluster>", exc))
|
||||
print(
|
||||
f" !! cluster {cluster.name!r} aborted: {exc}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if args.fail_fast:
|
||||
aborted = True
|
||||
continue
|
||||
totals.append((cluster.name, dl, sk, by))
|
||||
for key, exc in fails:
|
||||
failures.append((cluster.name, key, exc))
|
||||
if fails and args.fail_fast:
|
||||
aborted = True
|
||||
|
||||
print("\n=== summary ===")
|
||||
for name, dl, sk, by in totals:
|
||||
print(
|
||||
f" {name}: downloaded {dl}, skipped {sk}, "
|
||||
f"{_format_bytes(by)}"
|
||||
)
|
||||
for cname, key, exc in failures:
|
||||
print(f" FAIL {cname} {key}: {exc}", file=sys.stderr)
|
||||
|
||||
return 1 if failures else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@ -1,8 +0,0 @@
|
||||
# S3 Directory Explorer - Input File
|
||||
# One S3 prefix per line (within the bucket configured in data_explorer.py).
|
||||
# Blank lines and comments (#) are ignored.
|
||||
#
|
||||
# Examples:
|
||||
# data/sales/
|
||||
# data/inventory/
|
||||
# data/archive/
|
||||
@ -1,111 +0,0 @@
|
||||
# Example S3 download config for utils/s3_download.py.
|
||||
#
|
||||
# Shape mirrors what `s3_download.py` expects:
|
||||
#
|
||||
# python s3_download.py --config sample_s3_download_config.yaml --dry-run
|
||||
# python s3_download.py --config sample_s3_download_config.yaml
|
||||
#
|
||||
# Relative paths (e.g. local_folder) are resolved against this config file's
|
||||
# directory first, falling back to the current working directory if that
|
||||
# doesn't exist.
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Required: where to read from and write to
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
bucket: my-bucket
|
||||
|
||||
# Listing is recursive under this prefix - no S3 Delimiter is used. A nested
|
||||
# object like `census/2020/raw/nested/group_c1.sas7bdat` will be considered.
|
||||
# Regex patterns below match against the object BASENAME only (the part
|
||||
# after the last `/`), so subfolder location does not affect matching.
|
||||
prefix: census/2020/raw/
|
||||
|
||||
# Root destination on disk. One subfolder per cluster is created beneath it.
|
||||
# If two objects in the same cluster share a basename (possible under the
|
||||
# recursive scan), the second one is renamed to a key-derived filename
|
||||
# (slashes replaced with `__`) so neither file overwrites the other.
|
||||
local_folder: ./downloads
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Optional: AWS credentials
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Named profile from ~/.aws/credentials. Omit to use the default boto3
|
||||
# credential chain (env vars, instance role, SSO, etc.).
|
||||
# aws_profile: default
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Optional: discovery behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# When true (default), any object not matched by an explicit pattern below is
|
||||
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
||||
# _ / -) from the basename stem. Stems without trailing digits become their
|
||||
# own singleton cluster.
|
||||
#
|
||||
# Auto-detect only recognizes *trailing* digit runs. If your basenames put
|
||||
# the varying number in the middle of the stem (e.g. surrounded by year,
|
||||
# region, and detail components), auto-detect will NOT group them - each
|
||||
# object becomes its own singleton cluster. Use an explicit pattern instead;
|
||||
# see the embedded-digit example near the bottom of this file.
|
||||
auto_detect: true
|
||||
|
||||
# Object extensions to consider. Anything else under the prefix is ignored.
|
||||
# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv
|
||||
# extensions:
|
||||
# - .sas7bdat
|
||||
# - .xpt
|
||||
# - .txt
|
||||
# - .csv
|
||||
# - .tsv
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Optional: download behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# What to do when the destination file already exists locally.
|
||||
# skip - (default) if the local file's byte size matches the S3
|
||||
# object's Size, reuse it. If sizes differ, re-download with a
|
||||
# warning. Same byte-size cache rule used by
|
||||
# utils/file_viewer.py::_ensure_local_copy.
|
||||
# overwrite - always re-download.
|
||||
# error - abort the run if any local file exists with a different size.
|
||||
# (Equal sizes still skip.)
|
||||
on_exists: skip
|
||||
|
||||
# Parallel download workers. boto3 clients are thread-safe. Default: 4.
|
||||
# concurrency: 4
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Explicit cluster patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# Each pattern is matched against the S3 object BASENAME (last path segment
|
||||
# of the key). Objects matched by a pattern are pulled out of the
|
||||
# auto-detect pool, so explicit and auto clusters compose cleanly.
|
||||
#
|
||||
# `name` becomes both the subfolder name under `local_folder` and the label
|
||||
# used in the discovery / summary output. Names must be unique across
|
||||
# explicit clusters.
|
||||
clusters:
|
||||
- pattern: '^group_a\d+\.sas7bdat$'
|
||||
name: group_a
|
||||
|
||||
# - pattern: '^group_b\d+\.sas7bdat$'
|
||||
# name: group_b
|
||||
|
||||
# Embedded-digit example. When the varying number sits in the MIDDLE of
|
||||
# the basename stem (e.g. year2020_regionA_40_detail.sas7bdat,
|
||||
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
|
||||
# them - each object becomes its own singleton cluster. An explicit
|
||||
# pattern bucketizes them correctly. The \d+ matches any width, and
|
||||
# objects within the cluster are sorted numerically by the last digit
|
||||
# group in the stem, so _9_ sorts before _40_ regardless of zero-padding.
|
||||
#
|
||||
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
|
||||
# name: year2020_regionA_detail
|
||||
|
||||
# Text file cluster example (when file_type: text):
|
||||
# - pattern: '^data_group_a\d+\.txt$'
|
||||
# name: data_group_a
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user