Compare commits
No commits in common. "main" and "batch_folder_processing" have entirely different histories.
main
...
batch_fold
7
.gitignore
vendored
7
.gitignore
vendored
@ -1,7 +0,0 @@
|
|||||||
/.venv
|
|
||||||
/samples
|
|
||||||
.env
|
|
||||||
!.env.example
|
|
||||||
__pycache__/
|
|
||||||
venv/
|
|
||||||
*/__pycache__/
|
|
||||||
@ -3,6 +3,3 @@ PGPORT=5432
|
|||||||
PGUSER=
|
PGUSER=
|
||||||
PGPASSWORD=
|
PGPASSWORD=
|
||||||
PGDATABASE=
|
PGDATABASE=
|
||||||
|
|
||||||
S3_BUCKET=my-bucket
|
|
||||||
AWS_PROFILE=default
|
|
||||||
|
|||||||
5
generic_loader/.gitignore
vendored
Normal file
5
generic_loader/.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
/.venv
|
||||||
|
/samples
|
||||||
|
/.env
|
||||||
|
/__pycache__
|
||||||
|
/venv
|
||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,10 +1,7 @@
|
|||||||
pandas>=2.0,<3.0
|
pandas>=2.0,<3.0
|
||||||
pyreadstat>=1.2,<2.0
|
pyreadstat>=1.2,<1.3
|
||||||
numpy>=2.1,<3.0
|
numpy>=2.1,<3.0
|
||||||
pyarrow>=22.0,<24.0
|
|
||||||
pyyaml>=6.0,<7.0
|
pyyaml>=6.0,<7.0
|
||||||
psycopg2-binary>=2.9,<3.0
|
psycopg2-binary>=2.9,<3.0
|
||||||
python-dotenv>=1.0,<2.0
|
python-dotenv>=1.0,<2.0
|
||||||
boto3>=1.28,<2.0
|
boto3>=1.28,<2.0
|
||||||
openpyxl>=3.1,<4.0
|
|
||||||
tqdm>=4.66,<5.0
|
|
||||||
@ -15,64 +15,3 @@ tablename: kitchensink
|
|||||||
# What to do if the target table already exists: fail | replace | append
|
# What to do if the target table already exists: fail | replace | append
|
||||||
# Defaults to fail.
|
# Defaults to fail.
|
||||||
if_exists: append
|
if_exists: append
|
||||||
|
|
||||||
# file_type: Type of data file to load. One of: sas | text. Default: sas.
|
|
||||||
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
|
|
||||||
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
|
|
||||||
# file_type: sas
|
|
||||||
|
|
||||||
# delimiter: Column delimiter for text files. Only used when file_type: text.
|
|
||||||
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
|
|
||||||
# or any single character.
|
|
||||||
# delimiter: ","
|
|
||||||
|
|
||||||
# text_encoding: Character encoding for text files. Default: utf-8.
|
|
||||||
# Common alternatives: latin-1, cp1252, iso-8859-1.
|
|
||||||
# text_encoding: utf-8
|
|
||||||
|
|
||||||
# quotechar: Quote character for text files. Default: '"' (double quote).
|
|
||||||
# quotechar: '"'
|
|
||||||
|
|
||||||
# partition_by: Partition the table by unique values of these columns.
|
|
||||||
# Columns are applied in cascading order (first column = top-level partition).
|
|
||||||
# Requires if_exists: replace or fail (not append for initial creation).
|
|
||||||
# Single field:
|
|
||||||
# partition_by: state
|
|
||||||
# Multiple fields (cascading):
|
|
||||||
# partition_by:
|
|
||||||
# - state
|
|
||||||
# - zip
|
|
||||||
#
|
|
||||||
# max_partitions: Warning threshold for total partition count (default: 10000).
|
|
||||||
# If the number of partitions exceeds this, a warning is logged but loading continues.
|
|
||||||
# max_partitions: 10000
|
|
||||||
|
|
||||||
# indexes: Create B-tree indexes on these columns after data loading.
|
|
||||||
# Indexes are created with IF NOT EXISTS for safe use with append mode.
|
|
||||||
# Single column:
|
|
||||||
# indexes: state
|
|
||||||
# Multiple columns (one index per column):
|
|
||||||
# indexes:
|
|
||||||
# - state
|
|
||||||
# - zip
|
|
||||||
|
|
||||||
# column_types: Explicit {column_name: postgres_type} overrides that
|
|
||||||
# bypass automatic type inference for the listed columns. Useful when
|
|
||||||
# pyreadstat reports a column as NUM but you want it stored as TEXT
|
|
||||||
# (phone/ID columns that are conceptually strings), or when a column's
|
|
||||||
# inferred type is off for any other reason. Columns not listed here
|
|
||||||
# fall through to the normal inference path. Nullability is always
|
|
||||||
# computed from the data.
|
|
||||||
#
|
|
||||||
# column_types:
|
|
||||||
# RESP_PH_PREFIX_ID: TEXT
|
|
||||||
# SOMELONG_ID: BIGINT
|
|
||||||
|
|
||||||
# all_nullable: If true, every column is stamped nullable in the generated
|
|
||||||
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
|
|
||||||
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
|
|
||||||
# rare-null data downstream) and COPY blows up mid-load on the first null
|
|
||||||
# it hits. Off by default. The CLI flag --all-nullable overrides this to
|
|
||||||
# true when set.
|
|
||||||
#
|
|
||||||
# all_nullable: false
|
|
||||||
|
|||||||
@ -19,33 +19,8 @@ if_exists: replace
|
|||||||
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
||||||
# _ / -) from the file stem. Files with no trailing digits become their own
|
# _ / -) from the file stem. Files with no trailing digits become their own
|
||||||
# singleton cluster.
|
# singleton cluster.
|
||||||
#
|
|
||||||
# Auto-detect only recognizes *trailing* digit runs. If your file names put
|
|
||||||
# the varying number in the middle of the stem (e.g. surrounded by year,
|
|
||||||
# region, and detail components), auto-detect will NOT group them - each
|
|
||||||
# file becomes its own singleton cluster. Use an explicit pattern instead;
|
|
||||||
# see the embedded-digit example near the bottom of this file.
|
|
||||||
auto_detect: true
|
auto_detect: true
|
||||||
|
|
||||||
# file_type: Type of data files in this folder. One of: sas | text. Default: sas.
|
|
||||||
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
|
|
||||||
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
|
|
||||||
# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files
|
|
||||||
# instead of .sas7bdat/.xpt/.xport files.
|
|
||||||
# file_type: sas
|
|
||||||
|
|
||||||
# delimiter: Column delimiter for text files. Only used when file_type: text.
|
|
||||||
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
|
|
||||||
# or any single character.
|
|
||||||
# delimiter: ","
|
|
||||||
|
|
||||||
# text_encoding: Character encoding for text files. Default: utf-8.
|
|
||||||
# Common alternatives: latin-1, cp1252, iso-8859-1.
|
|
||||||
# text_encoding: utf-8
|
|
||||||
|
|
||||||
# quotechar: Quote character for text files. Default: '"' (double quote).
|
|
||||||
# quotechar: '"'
|
|
||||||
|
|
||||||
# Folder-level column filter. Every file in every cluster passes through
|
# Folder-level column filter. Every file in every cluster passes through
|
||||||
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
|
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
|
||||||
# override these via its own `include` / `exclude` keys.
|
# override these via its own `include` / `exclude` keys.
|
||||||
@ -56,76 +31,15 @@ auto_detect: true
|
|||||||
# exclude:
|
# exclude:
|
||||||
# - ALLNULL
|
# - ALLNULL
|
||||||
|
|
||||||
# Folder-level partition_by: Partition every cluster's table by unique values
|
|
||||||
# of these columns. Inherited by all clusters unless overridden per-cluster.
|
|
||||||
# Requires if_exists: replace or fail (not append for initial creation).
|
|
||||||
# Single field:
|
|
||||||
# partition_by: state
|
|
||||||
# Multiple fields (cascading):
|
|
||||||
# partition_by:
|
|
||||||
# - state
|
|
||||||
# - zip
|
|
||||||
#
|
|
||||||
# Folder-level max_partitions: Warning threshold for total partition count
|
|
||||||
# (default: 10000). Inherited by all clusters unless overridden per-cluster.
|
|
||||||
# max_partitions: 10000
|
|
||||||
|
|
||||||
# Folder-level indexes: Create B-tree indexes on these columns after data
|
|
||||||
# loading. Inherited by all clusters unless overridden per-cluster.
|
|
||||||
# Indexes are created with IF NOT EXISTS for safe use with append mode.
|
|
||||||
# Single column:
|
|
||||||
# indexes: state
|
|
||||||
# Multiple columns (one index per column):
|
|
||||||
# indexes:
|
|
||||||
# - state
|
|
||||||
# - zip
|
|
||||||
|
|
||||||
# Folder-level column_types: Explicit {column_name: postgres_type} map that
|
|
||||||
# bypasses automatic type inference for the listed columns. Applied to
|
|
||||||
# every cluster unless a cluster supplies its own column_types, which are
|
|
||||||
# merged on top (cluster entries win on conflict).
|
|
||||||
#
|
|
||||||
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
|
|
||||||
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
|
|
||||||
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
|
|
||||||
# BIGINT). Entries in column_types here win over that auto-union - use
|
|
||||||
# them when the auto result is wrong or when --no-prescan disables the
|
|
||||||
# auto-union and you still need to pin a column.
|
|
||||||
#
|
|
||||||
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
|
|
||||||
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
|
|
||||||
# don't exist in a given file are simply ignored for that file.
|
|
||||||
#
|
|
||||||
# column_types:
|
|
||||||
# RESP_PH_PREFIX_ID: TEXT
|
|
||||||
# RESP_PH_SUFFIX_ID: TEXT
|
|
||||||
# SOMELONG_ID: BIGINT
|
|
||||||
|
|
||||||
# Folder-level all_nullable: If true, every column of every cluster is
|
|
||||||
# stamped nullable in the generated schema; NOT NULL inference is skipped
|
|
||||||
# entirely. Use this when the sampler wrongly concludes a column has no
|
|
||||||
# nulls (sampled rows happened to be dense, but later files in the cluster
|
|
||||||
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
|
|
||||||
# unless a cluster supplies its own all_nullable. The CLI flag
|
|
||||||
# --all-nullable overrides both this and any per-cluster setting when
|
|
||||||
# passed. Off by default.
|
|
||||||
#
|
|
||||||
# all_nullable: false
|
|
||||||
|
|
||||||
# Explicit cluster patterns. Each pattern is matched against the file
|
# Explicit cluster patterns. Each pattern is matched against the file
|
||||||
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
# *basename*. Files matched by a pattern are pulled out of the auto-detect
|
||||||
# pool, so explicit and auto clusters compose cleanly.
|
# pool, so explicit and auto clusters compose cleanly.
|
||||||
#
|
#
|
||||||
# `tablename` is required. `if_exists`, `include`, `exclude`, and
|
# `tablename` is required. `if_exists`, `include`, and `exclude` are
|
||||||
# `column_types` are optional per-cluster overrides of the folder-level
|
# optional per-cluster overrides of the folder-level defaults above.
|
||||||
# defaults above. Cluster-level column_types entries win over folder-
|
|
||||||
# level entries for the same column.
|
|
||||||
clusters:
|
clusters:
|
||||||
- pattern: '^group_a\d+\.xpt$'
|
- pattern: '^group_a\d+\.xpt$'
|
||||||
tablename: group_a
|
tablename: group_a
|
||||||
# column_types:
|
|
||||||
# INTCOL: TEXT
|
|
||||||
# all_nullable: true # per-cluster override of the folder-level default
|
|
||||||
|
|
||||||
# Example of an explicit override. Uncomment to force the group_b cluster to
|
# Example of an explicit override. Uncomment to force the group_b cluster to
|
||||||
# append instead of replace even though the folder default is "replace":
|
# append instead of replace even though the folder default is "replace":
|
||||||
@ -134,44 +48,7 @@ clusters:
|
|||||||
# tablename: group_b
|
# tablename: group_b
|
||||||
# if_exists: append
|
# if_exists: append
|
||||||
|
|
||||||
# Per-cluster partition_by / max_partitions override. These take precedence
|
# With only the gq pattern explicit, auto_detect: true will still bucket
|
||||||
# over the folder-level defaults above.
|
# group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
|
||||||
#
|
|
||||||
# - pattern: '^group_c\d+\.xpt$'
|
|
||||||
# tablename: group_c
|
|
||||||
# partition_by:
|
|
||||||
# - region
|
|
||||||
# - year
|
|
||||||
# max_partitions: 500
|
|
||||||
|
|
||||||
# Per-cluster indexes override. Takes precedence over the folder-level
|
|
||||||
# indexes default above. An explicit empty list disables indexing for
|
|
||||||
# this cluster even when the folder default has indexes.
|
|
||||||
#
|
|
||||||
# - pattern: '^group_d\d+\.xpt$'
|
|
||||||
# tablename: group_d
|
|
||||||
# indexes:
|
|
||||||
# - region
|
|
||||||
# - year
|
|
||||||
|
|
||||||
# Embedded-digit example. When the varying number sits in the MIDDLE of
|
|
||||||
# the stem (e.g. year2020_regionA_40_detail.sas7bdat,
|
|
||||||
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
|
|
||||||
# them - each file becomes its own singleton cluster. An explicit
|
|
||||||
# pattern bucketizes them correctly. The \d+ matches any width, and
|
|
||||||
# files within the cluster are sorted numerically by the last digit
|
|
||||||
# group in the stem, so _9_ sorts before _40_ regardless of zero-
|
|
||||||
# padding. Gaps in the numeric sequence (missing 3, 7, 14, ...) are
|
|
||||||
# fine - whatever files are present get loaded in numeric order.
|
|
||||||
#
|
|
||||||
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
|
|
||||||
# tablename: year2020_regionA_detail
|
|
||||||
|
|
||||||
# Text file cluster example (when file_type: text):
|
|
||||||
# - pattern: '^data_group_a\d+\.txt$'
|
|
||||||
# tablename: data_group_a
|
|
||||||
|
|
||||||
# With only the group_a pattern explicit, auto_detect: true will still
|
|
||||||
# bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
|
|
||||||
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
|
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
|
||||||
# for the fixture that exercises exactly this layout.
|
# for the fixture that exercises exactly this layout.
|
||||||
|
|||||||
@ -1,107 +0,0 @@
|
|||||||
{
|
|
||||||
"ALLNULL": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"ALLNULLC": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"note": "entirely null character",
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"BIGINT": {
|
|
||||||
"note": "values beyond int32 range",
|
|
||||||
"nullable": true,
|
|
||||||
"postgres_type": "BIGINT"
|
|
||||||
},
|
|
||||||
"BOOLCOL": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"BOOLEAN",
|
|
||||||
"SMALLINT",
|
|
||||||
"INTEGER"
|
|
||||||
],
|
|
||||||
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"CONST": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"nullable": false
|
|
||||||
},
|
|
||||||
"DATEASTR": {
|
|
||||||
"note": "stored as char in SAS; loader should coerce ISO-date strings",
|
|
||||||
"nullable": true,
|
|
||||||
"postgres_type": "DATE"
|
|
||||||
},
|
|
||||||
"DATECOL": {
|
|
||||||
"note": "positive control",
|
|
||||||
"nullable": false,
|
|
||||||
"postgres_type": "DATE"
|
|
||||||
},
|
|
||||||
"DTCOL": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TIMESTAMP",
|
|
||||||
"TIMESTAMP WITHOUT TIME ZONE"
|
|
||||||
],
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"FLOATCOL": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"DOUBLE PRECISION",
|
|
||||||
"NUMERIC"
|
|
||||||
],
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"ID": {
|
|
||||||
"nullable": false,
|
|
||||||
"postgres_type": "INTEGER"
|
|
||||||
},
|
|
||||||
"INTCOL": {
|
|
||||||
"note": "positive control",
|
|
||||||
"nullable": false,
|
|
||||||
"postgres_type": "INTEGER"
|
|
||||||
},
|
|
||||||
"LONGSTR": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"MIXED": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"note": "heterogeneous content; loader should fall back to text",
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"NUMASSTR": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"NUMERIC",
|
|
||||||
"DOUBLE PRECISION"
|
|
||||||
],
|
|
||||||
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
|
|
||||||
"nullable": true
|
|
||||||
},
|
|
||||||
"STRCOL": {
|
|
||||||
"acceptable_types": [
|
|
||||||
"TEXT",
|
|
||||||
"VARCHAR"
|
|
||||||
],
|
|
||||||
"note": "positive control",
|
|
||||||
"nullable": false
|
|
||||||
},
|
|
||||||
"TIMECOL": {
|
|
||||||
"nullable": true,
|
|
||||||
"postgres_type": "TIME"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Binary file not shown.
@ -1,679 +0,0 @@
|
|||||||
"""Explore S3 directories and categorise them by accessibility.
|
|
||||||
|
|
||||||
Reads a text file containing one S3 prefix per line (paths within the bucket
|
|
||||||
configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then
|
|
||||||
for each prefix:
|
|
||||||
|
|
||||||
- Lists all objects recursively (via ``list_objects_v2`` paginator)
|
|
||||||
- **Only considers files matching the configured extensions** (default: all
|
|
||||||
supported extensions — SAS and text). All other file types are ignored.
|
|
||||||
- Tests read permission with ``head_object`` on the first matching file found
|
|
||||||
- If the first file is accessible, tests ALL remaining files individually
|
|
||||||
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
|
|
||||||
tracks individual file **Exceptions** within available directories
|
|
||||||
|
|
||||||
Supported file types
|
|
||||||
--------------------
|
|
||||||
* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport``
|
|
||||||
* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv``
|
|
||||||
|
|
||||||
A directory is considered *empty* if it contains no files matching the
|
|
||||||
extension filter, even when other file types are present.
|
|
||||||
|
|
||||||
Configure the constants below (or use CLI arguments), then run::
|
|
||||||
|
|
||||||
python3 data_explorer.py [OPTIONS]
|
|
||||||
|
|
||||||
Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from typing import List, Set, Tuple
|
|
||||||
|
|
||||||
from dotenv import find_dotenv, load_dotenv
|
|
||||||
|
|
||||||
load_dotenv(find_dotenv())
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Dependency check
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
try:
|
|
||||||
import boto3 # noqa: F401
|
|
||||||
import botocore.exceptions # noqa: F401
|
|
||||||
except ImportError:
|
|
||||||
print(
|
|
||||||
"ERROR: boto3 / botocore is not installed.\n"
|
|
||||||
"Install with: pip install boto3",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Extension constants
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"}
|
|
||||||
"""File extensions recognised as SAS data files."""
|
|
||||||
|
|
||||||
TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"}
|
|
||||||
"""File extensions recognised as delimited text / CSV files."""
|
|
||||||
|
|
||||||
SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
|
||||||
"""Union of all file extensions this tool can work with."""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Configuration defaults — edit these or override via CLI arguments
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
|
|
||||||
"""Set of extensions to filter on (case-insensitive). Defaults to all supported."""
|
|
||||||
|
|
||||||
INPUT_FILE: str = "s3_directories.txt"
|
|
||||||
"""Path to the text file containing one S3 prefix per line."""
|
|
||||||
|
|
||||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
|
||||||
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
|
|
||||||
|
|
||||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
|
||||||
"""AWS CLI profile name used for authentication."""
|
|
||||||
|
|
||||||
# Text-file reading defaults (used when downloading / previewing text files)
|
|
||||||
DEFAULT_DELIMITER: str = ","
|
|
||||||
DEFAULT_ENCODING: str = "utf-8"
|
|
||||||
DEFAULT_QUOTECHAR: str = '"'
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Auto-detection helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def detect_file_type(filename: str) -> str:
|
|
||||||
"""Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension.
|
|
||||||
|
|
||||||
The check is case-insensitive. For ``.tsv`` files the caller should
|
|
||||||
default the delimiter to a tab character (``'\\t'``).
|
|
||||||
|
|
||||||
Examples
|
|
||||||
--------
|
|
||||||
>>> detect_file_type("data.sas7bdat")
|
|
||||||
'sas'
|
|
||||||
>>> detect_file_type("report.CSV")
|
|
||||||
'text'
|
|
||||||
>>> detect_file_type("archive.zip")
|
|
||||||
'unknown'
|
|
||||||
"""
|
|
||||||
ext = os.path.splitext(filename)[1].lower()
|
|
||||||
if ext in SAS_EXTENSIONS:
|
|
||||||
return "sas"
|
|
||||||
if ext in TEXT_EXTENSIONS:
|
|
||||||
return "text"
|
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
|
|
||||||
def default_delimiter_for(filename: str) -> str:
|
|
||||||
"""Return a sensible default delimiter for *filename*.
|
|
||||||
|
|
||||||
* ``.tsv`` → ``'\\t'``
|
|
||||||
* everything else → ``','``
|
|
||||||
"""
|
|
||||||
ext = os.path.splitext(filename)[1].lower()
|
|
||||||
if ext == ".tsv":
|
|
||||||
return "\t"
|
|
||||||
return ","
|
|
||||||
|
|
||||||
|
|
||||||
def matches_extensions(key: str, extensions: Set[str]) -> bool:
|
|
||||||
"""Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive)."""
|
|
||||||
key_lower = key.lower()
|
|
||||||
return any(key_lower.endswith(ext) for ext in extensions)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Data structures
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AvailableDir:
|
|
||||||
"""An S3 directory that is readable."""
|
|
||||||
|
|
||||||
prefix: str
|
|
||||||
file_count: int
|
|
||||||
total_size: int # bytes
|
|
||||||
accessible_count: int = 0 # files that passed head_object
|
|
||||||
total_count: int = 0 # total .sas7bdat files found
|
|
||||||
accessible_size: int = 0 # total size of accessible files only
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class BlockedDir:
|
|
||||||
"""An S3 directory where access was denied or an error occurred."""
|
|
||||||
|
|
||||||
prefix: str
|
|
||||||
file_count: int
|
|
||||||
error: str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class EmptyDir:
|
|
||||||
"""An S3 directory with no objects."""
|
|
||||||
|
|
||||||
prefix: str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ExceptionFile:
|
|
||||||
"""A specific file that failed permission check within an otherwise available directory."""
|
|
||||||
|
|
||||||
prefix: str # the directory prefix
|
|
||||||
key: str # the full S3 key of the failed file
|
|
||||||
error: str # the error message
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Results:
|
|
||||||
"""Aggregated exploration results."""
|
|
||||||
|
|
||||||
available: List[AvailableDir] = field(default_factory=list)
|
|
||||||
blocked: List[BlockedDir] = field(default_factory=list)
|
|
||||||
empty: List[EmptyDir] = field(default_factory=list)
|
|
||||||
exceptions: List[ExceptionFile] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def read_input_file(path: str) -> List[str]:
|
|
||||||
"""Return a list of S3 prefixes from *path*, ignoring blanks and comments.
|
|
||||||
|
|
||||||
Each line is stripped and normalised so that non-empty prefixes always end
|
|
||||||
with a trailing ``/``.
|
|
||||||
"""
|
|
||||||
prefixes: List[str] = []
|
|
||||||
with open(path, encoding="utf-8") as fh:
|
|
||||||
for raw_line in fh:
|
|
||||||
line = raw_line.strip()
|
|
||||||
if not line or line.startswith("#"):
|
|
||||||
continue
|
|
||||||
# Normalise: strip surrounding whitespace/slashes, then re-add
|
|
||||||
# a single trailing slash (unless the prefix is empty/root).
|
|
||||||
line = line.strip("/")
|
|
||||||
if line:
|
|
||||||
line += "/"
|
|
||||||
prefixes.append(line)
|
|
||||||
return prefixes
|
|
||||||
|
|
||||||
|
|
||||||
def format_size(size_bytes: int) -> str:
|
|
||||||
"""Return a human-readable size string (KB, MB, GB, TB)."""
|
|
||||||
if size_bytes < 1024:
|
|
||||||
return f"{size_bytes} B"
|
|
||||||
for unit in ("KB", "MB", "GB", "TB"):
|
|
||||||
size_bytes /= 1024.0
|
|
||||||
if size_bytes < 1024.0 or unit == "TB":
|
|
||||||
return f"{size_bytes:,.1f} {unit}"
|
|
||||||
# Fallback (should not be reached)
|
|
||||||
return f"{size_bytes:,.1f} TB"
|
|
||||||
|
|
||||||
|
|
||||||
def extensions_label(extensions: Set[str]) -> str:
|
|
||||||
"""Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``)."""
|
|
||||||
return "/".join(sorted(extensions))
|
|
||||||
|
|
||||||
|
|
||||||
def list_objects(
|
|
||||||
s3_client: "botocore.client.S3",
|
|
||||||
bucket: str,
|
|
||||||
prefix: str,
|
|
||||||
extensions: Set[str] | None = None,
|
|
||||||
) -> Tuple[List[Tuple[str, int]], int]:
|
|
||||||
"""Recursively list all objects under *prefix*.
|
|
||||||
|
|
||||||
Only objects whose key ends with one of *extensions* (case-insensitive) are
|
|
||||||
counted. All other files are silently skipped. When *extensions* is
|
|
||||||
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
|
|
||||||
|
|
||||||
Returns ``(files, total_size)`` where *files* is a list of
|
|
||||||
``(key, size)`` tuples for every matching object and *total_size* is the
|
|
||||||
sum of their sizes in bytes.
|
|
||||||
"""
|
|
||||||
if extensions is None:
|
|
||||||
extensions = FILE_EXTENSIONS
|
|
||||||
exts_lower = {e.lower() for e in extensions}
|
|
||||||
paginator = s3_client.get_paginator("list_objects_v2")
|
|
||||||
files: List[Tuple[str, int]] = []
|
|
||||||
total_size: int = 0
|
|
||||||
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
|
||||||
for obj in page.get("Contents", []):
|
|
||||||
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
|
|
||||||
continue
|
|
||||||
files.append((obj["Key"], obj["Size"]))
|
|
||||||
total_size += obj["Size"]
|
|
||||||
return files, total_size
|
|
||||||
|
|
||||||
|
|
||||||
def check_read_permission(
|
|
||||||
s3_client: "botocore.client.S3",
|
|
||||||
bucket: str,
|
|
||||||
key: str,
|
|
||||||
) -> str | None:
|
|
||||||
"""Try ``head_object`` on *key*. Return ``None`` on success or an error string."""
|
|
||||||
try:
|
|
||||||
s3_client.head_object(Bucket=bucket, Key=key)
|
|
||||||
except botocore.exceptions.ClientError as exc:
|
|
||||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
|
||||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
|
||||||
return f"{message} ({code})"
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Core logic
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def explore_directories(
|
|
||||||
prefixes: List[str],
|
|
||||||
*,
|
|
||||||
extensions: Set[str] | None = None,
|
|
||||||
) -> Results:
|
|
||||||
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
prefixes:
|
|
||||||
List of S3 key prefixes to explore.
|
|
||||||
extensions:
|
|
||||||
Set of file extensions to filter on. Defaults to the module-level
|
|
||||||
``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``).
|
|
||||||
"""
|
|
||||||
if extensions is None:
|
|
||||||
extensions = FILE_EXTENSIONS
|
|
||||||
exts_lower = {e.lower() for e in extensions}
|
|
||||||
ext_label = extensions_label(extensions)
|
|
||||||
|
|
||||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
|
||||||
s3 = session.client("s3")
|
|
||||||
|
|
||||||
results = Results()
|
|
||||||
total = len(prefixes)
|
|
||||||
|
|
||||||
for idx, prefix in enumerate(prefixes, start=1):
|
|
||||||
print(
|
|
||||||
f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- Recursive listing ------------------------------------------------
|
|
||||||
try:
|
|
||||||
files, total_size = list_objects(
|
|
||||||
s3, S3_BUCKET, prefix, extensions=extensions,
|
|
||||||
)
|
|
||||||
except botocore.exceptions.ClientError as exc:
|
|
||||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
|
||||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
|
||||||
results.blocked.append(
|
|
||||||
BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})")
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
except Exception as exc:
|
|
||||||
results.blocked.append(
|
|
||||||
BlockedDir(prefix=prefix, file_count=0, error=str(exc))
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not files:
|
|
||||||
results.empty.append(EmptyDir(prefix=prefix))
|
|
||||||
continue
|
|
||||||
|
|
||||||
file_count = len(files)
|
|
||||||
|
|
||||||
# --- Permission check on first file -----------------------------------
|
|
||||||
# in "/") for the head_object test. The listing is already filtered
|
|
||||||
# to the requested extensions, so any non-marker key is a valid probe.
|
|
||||||
first_key, _ = files[0]
|
|
||||||
test_key = first_key
|
|
||||||
if first_key.endswith("/") and total_size > 0:
|
|
||||||
for key, size in files:
|
|
||||||
if not (key.endswith("/") and size == 0):
|
|
||||||
test_key = key
|
|
||||||
break
|
|
||||||
|
|
||||||
error = check_read_permission(s3, S3_BUCKET, test_key)
|
|
||||||
if error is not None:
|
|
||||||
# First file blocked → entire directory is blocked
|
|
||||||
results.blocked.append(
|
|
||||||
BlockedDir(prefix=prefix, file_count=file_count, error=error)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# --- First file accessible → check ALL remaining files ----------------
|
|
||||||
accessible_count = 1 # the first (test_key) already passed
|
|
||||||
accessible_size = 0
|
|
||||||
dir_exceptions: List[ExceptionFile] = []
|
|
||||||
|
|
||||||
# Find the size of the test_key to count it
|
|
||||||
for key, size in files:
|
|
||||||
if key == test_key:
|
|
||||||
accessible_size = size
|
|
||||||
break
|
|
||||||
|
|
||||||
# Build list of remaining files to check
|
|
||||||
remaining = [(key, size) for key, size in files if key != test_key]
|
|
||||||
|
|
||||||
if remaining:
|
|
||||||
if len(remaining) > 10:
|
|
||||||
print(
|
|
||||||
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
|
|
||||||
for key, size in remaining:
|
|
||||||
file_error = check_read_permission(s3, S3_BUCKET, key)
|
|
||||||
if file_error is None:
|
|
||||||
accessible_count += 1
|
|
||||||
accessible_size += size
|
|
||||||
else:
|
|
||||||
dir_exceptions.append(
|
|
||||||
ExceptionFile(prefix=prefix, key=key, error=file_error)
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Only one file and it passed
|
|
||||||
accessible_size = total_size
|
|
||||||
|
|
||||||
results.available.append(
|
|
||||||
AvailableDir(
|
|
||||||
prefix=prefix,
|
|
||||||
file_count=file_count,
|
|
||||||
total_size=total_size,
|
|
||||||
accessible_count=accessible_count,
|
|
||||||
total_count=file_count,
|
|
||||||
accessible_size=accessible_size,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
results.exceptions.extend(dir_exceptions)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Output
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def print_results(results: Results, *, extensions: Set[str] | None = None) -> None:
|
|
||||||
"""Print a clean, human-readable summary to stdout.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
results:
|
|
||||||
The exploration results to display.
|
|
||||||
extensions:
|
|
||||||
The set of extensions that were used for filtering. Used only for
|
|
||||||
labelling in the output. Defaults to ``FILE_EXTENSIONS``.
|
|
||||||
"""
|
|
||||||
if extensions is None:
|
|
||||||
extensions = FILE_EXTENSIONS
|
|
||||||
ext_label = extensions_label(extensions)
|
|
||||||
|
|
||||||
print()
|
|
||||||
print("=== S3 Directory Explorer Results ===")
|
|
||||||
print(f"Bucket: {S3_BUCKET}")
|
|
||||||
print(f"Extensions: {ext_label}")
|
|
||||||
|
|
||||||
# --- Available ---
|
|
||||||
print()
|
|
||||||
print(f"--- Available ({len(results.available)}) ---")
|
|
||||||
if results.available:
|
|
||||||
for d in results.available:
|
|
||||||
print(f" {d.prefix}")
|
|
||||||
print(
|
|
||||||
f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible"
|
|
||||||
f" | Total Size: {format_size(d.accessible_size)}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
print(" (none)")
|
|
||||||
|
|
||||||
# --- Blocked ---
|
|
||||||
print()
|
|
||||||
print(f"--- Blocked ({len(results.blocked)}) ---")
|
|
||||||
if results.blocked:
|
|
||||||
for d in results.blocked:
|
|
||||||
if d.file_count:
|
|
||||||
print(f" {d.prefix}")
|
|
||||||
print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}")
|
|
||||||
else:
|
|
||||||
print(f" {d.prefix}")
|
|
||||||
print(f" Error: {d.error}")
|
|
||||||
else:
|
|
||||||
print(" (none)")
|
|
||||||
|
|
||||||
# --- Exceptions ---
|
|
||||||
print()
|
|
||||||
print(f"--- Exceptions ({len(results.exceptions)}) ---")
|
|
||||||
if results.exceptions:
|
|
||||||
for exc in results.exceptions:
|
|
||||||
print(f" {exc.key}")
|
|
||||||
print(f" Directory: {exc.prefix} | Error: {exc.error}")
|
|
||||||
else:
|
|
||||||
print(" (none)")
|
|
||||||
|
|
||||||
# --- Empty ---
|
|
||||||
print()
|
|
||||||
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
|
|
||||||
if results.empty:
|
|
||||||
for d in results.empty:
|
|
||||||
print(f" {d.prefix}")
|
|
||||||
else:
|
|
||||||
print(" (none)")
|
|
||||||
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# CLI argument parsing
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def build_arg_parser() -> argparse.ArgumentParser:
|
|
||||||
"""Build and return the CLI argument parser.
|
|
||||||
|
|
||||||
Supports selecting file-type filters, text-file reading parameters, and
|
|
||||||
overriding the default bucket / profile / input-file settings.
|
|
||||||
"""
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description=(
|
|
||||||
"Explore S3 directories and categorise them by accessibility. "
|
|
||||||
"Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text "
|
|
||||||
"files (.txt, .csv, .tsv)."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- File-type / extension selection ---
|
|
||||||
type_group = parser.add_argument_group("File-type selection")
|
|
||||||
type_group.add_argument(
|
|
||||||
"--file-type",
|
|
||||||
choices=["sas", "text", "all"],
|
|
||||||
default="all",
|
|
||||||
help=(
|
|
||||||
"Restrict the scan to a specific file type. "
|
|
||||||
"'sas' = .sas7bdat/.xpt/.xport only; "
|
|
||||||
"'text' = .txt/.csv/.tsv only; "
|
|
||||||
"'all' = both (default)."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
type_group.add_argument(
|
|
||||||
"--extensions",
|
|
||||||
nargs="+",
|
|
||||||
metavar="EXT",
|
|
||||||
help=(
|
|
||||||
"Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). "
|
|
||||||
"Overrides --file-type when provided."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- Text-file reading parameters ---
|
|
||||||
text_group = parser.add_argument_group(
|
|
||||||
"Text-file parameters",
|
|
||||||
description=(
|
|
||||||
"Parameters used when reading delimited text files. These are "
|
|
||||||
"stored for downstream consumers and do not affect the S3 scan "
|
|
||||||
"itself."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--delimiter",
|
|
||||||
default=None,
|
|
||||||
help=(
|
|
||||||
"Field delimiter for text files (default: ',' for .csv/.txt, "
|
|
||||||
"'\\t' for .tsv). Use 'tab' or '\\t' for a tab character."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--encoding",
|
|
||||||
default=DEFAULT_ENCODING,
|
|
||||||
help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).",
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--quotechar",
|
|
||||||
default=DEFAULT_QUOTECHAR,
|
|
||||||
help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).",
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- S3 / general settings ---
|
|
||||||
s3_group = parser.add_argument_group("S3 settings")
|
|
||||||
s3_group.add_argument(
|
|
||||||
"--bucket",
|
|
||||||
default=None,
|
|
||||||
help=f"S3 bucket name (default: {S3_BUCKET}).",
|
|
||||||
)
|
|
||||||
s3_group.add_argument(
|
|
||||||
"--profile",
|
|
||||||
default=None,
|
|
||||||
help=f"AWS CLI profile name (default: {AWS_PROFILE}).",
|
|
||||||
)
|
|
||||||
s3_group.add_argument(
|
|
||||||
"--input-file",
|
|
||||||
default=None,
|
|
||||||
help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).",
|
|
||||||
)
|
|
||||||
|
|
||||||
return parser
|
|
||||||
|
|
||||||
|
|
||||||
def resolve_extensions(args: argparse.Namespace) -> Set[str]:
|
|
||||||
"""Determine the active extension set from parsed CLI *args*.
|
|
||||||
|
|
||||||
If ``--extensions`` is provided it takes precedence. Otherwise
|
|
||||||
``--file-type`` is used to select a predefined set.
|
|
||||||
"""
|
|
||||||
if args.extensions:
|
|
||||||
# Normalise: ensure each extension starts with a dot and is lowercase
|
|
||||||
exts: Set[str] = set()
|
|
||||||
for ext in args.extensions:
|
|
||||||
ext = ext.strip().lower()
|
|
||||||
if not ext.startswith("."):
|
|
||||||
ext = "." + ext
|
|
||||||
exts.add(ext)
|
|
||||||
return exts
|
|
||||||
|
|
||||||
if args.file_type == "sas":
|
|
||||||
return SAS_EXTENSIONS
|
|
||||||
if args.file_type == "text":
|
|
||||||
return TEXT_EXTENSIONS
|
|
||||||
return SUPPORTED_EXTENSIONS
|
|
||||||
|
|
||||||
|
|
||||||
def resolve_delimiter(args: argparse.Namespace) -> str:
|
|
||||||
"""Return the effective delimiter from parsed CLI *args*.
|
|
||||||
|
|
||||||
Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a
|
|
||||||
tab character on the command line without shell-escaping issues.
|
|
||||||
"""
|
|
||||||
if args.delimiter is None:
|
|
||||||
return DEFAULT_DELIMITER
|
|
||||||
raw = args.delimiter
|
|
||||||
if raw.lower() in ("tab", "\\t"):
|
|
||||||
return "\t"
|
|
||||||
return raw
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Main
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = build_arg_parser()
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# --- Apply CLI overrides to module-level config ---------------------------
|
|
||||||
if args.bucket:
|
|
||||||
S3_BUCKET = args.bucket
|
|
||||||
if args.profile:
|
|
||||||
AWS_PROFILE = args.profile
|
|
||||||
input_file = args.input_file if args.input_file else INPUT_FILE
|
|
||||||
|
|
||||||
active_extensions = resolve_extensions(args)
|
|
||||||
FILE_EXTENSIONS = active_extensions
|
|
||||||
|
|
||||||
delimiter = resolve_delimiter(args)
|
|
||||||
encoding = args.encoding
|
|
||||||
quotechar = args.quotechar
|
|
||||||
|
|
||||||
# --- Read input file ------------------------------------------------------
|
|
||||||
if not os.path.exists(input_file):
|
|
||||||
print(f"ERROR: Input file not found: {input_file}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
prefixes = read_input_file(input_file)
|
|
||||||
except Exception as exc:
|
|
||||||
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not prefixes:
|
|
||||||
print("No valid S3 prefixes found in the input file. Nothing to do.")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# --- Validate AWS profile -------------------------------------------------
|
|
||||||
try:
|
|
||||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
|
||||||
# Force credential resolution to catch bad profiles early
|
|
||||||
credentials = session.get_credentials()
|
|
||||||
if credentials is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"No credentials found for AWS profile {AWS_PROFILE!r}"
|
|
||||||
)
|
|
||||||
except botocore.exceptions.ProfileNotFound as exc:
|
|
||||||
print(f"ERROR: {exc}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
except Exception as exc:
|
|
||||||
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# --- Print active configuration -------------------------------------------
|
|
||||||
ext_label = extensions_label(active_extensions)
|
|
||||||
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
|
|
||||||
print(f"Extensions: {ext_label}", file=sys.stderr)
|
|
||||||
if active_extensions & TEXT_EXTENSIONS:
|
|
||||||
print(
|
|
||||||
f"Text opts: delimiter={delimiter!r} encoding={encoding!r} "
|
|
||||||
f"quotechar={quotechar!r}",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- Explore --------------------------------------------------------------
|
|
||||||
results = explore_directories(prefixes, extensions=active_extensions)
|
|
||||||
print_results(results, extensions=active_extensions)
|
|
||||||
@ -1,374 +0,0 @@
|
|||||||
"""Standalone utility to download a SAS or delimited text file from S3 and
|
|
||||||
print a column-level summary of the first *N* rows.
|
|
||||||
|
|
||||||
Supported formats
|
|
||||||
-----------------
|
|
||||||
* **SAS** – ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*)
|
|
||||||
* **Text** – ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*)
|
|
||||||
|
|
||||||
Configure the four constants below **or** use the CLI arguments, then run::
|
|
||||||
|
|
||||||
python3 file_viewer.py
|
|
||||||
python3 file_viewer.py --local path/to/file.csv
|
|
||||||
python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t'
|
|
||||||
|
|
||||||
Python 3.14 compatible.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from dotenv import find_dotenv, load_dotenv
|
|
||||||
|
|
||||||
load_dotenv(find_dotenv())
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import pandas as pd
|
|
||||||
import pyreadstat
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Supported file extensions
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"}
|
|
||||||
"""File extensions recognised as SAS data files."""
|
|
||||||
|
|
||||||
TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"}
|
|
||||||
"""File extensions recognised as delimited text files."""
|
|
||||||
|
|
||||||
SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
|
|
||||||
"""Union of all supported file extensions."""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Configuration — edit these before running (or use CLI arguments)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
|
|
||||||
"""S3 bucket name."""
|
|
||||||
|
|
||||||
S3_KEY: str = "path/to/file.sas7bdat"
|
|
||||||
"""Object key (path) within the bucket to a supported data file."""
|
|
||||||
|
|
||||||
LOCAL_FOLDER: str = "./downloads"
|
|
||||||
"""Local directory to download the file into."""
|
|
||||||
|
|
||||||
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
|
|
||||||
"""AWS CLI profile name used for authentication."""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None:
|
|
||||||
"""Download *key* from *bucket* to *local_path*, skipping if already present.
|
|
||||||
|
|
||||||
If *local_path* exists and its size matches the S3 object's size, the
|
|
||||||
download is skipped and a message is printed.
|
|
||||||
|
|
||||||
Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`.
|
|
||||||
"""
|
|
||||||
session = boto3.Session(profile_name=AWS_PROFILE)
|
|
||||||
s3 = session.client("s3")
|
|
||||||
|
|
||||||
remote_size = s3.head_object(Bucket=bucket, Key=key)["ContentLength"]
|
|
||||||
|
|
||||||
if os.path.exists(local_path):
|
|
||||||
local_size = os.path.getsize(local_path)
|
|
||||||
if local_size == remote_size:
|
|
||||||
print(
|
|
||||||
f"Local file {local_path} already matches s3://{bucket}/{key} "
|
|
||||||
f"({local_size} bytes); skipping download."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
print(
|
|
||||||
f"Local file {local_path} size ({local_size} bytes) differs from "
|
|
||||||
f"S3 ({remote_size} bytes); re-downloading."
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f"Downloading s3://{bucket}/{key} -> {local_path}")
|
|
||||||
s3.download_file(bucket, key, local_path)
|
|
||||||
print("Download complete.")
|
|
||||||
|
|
||||||
|
|
||||||
# -- SAS readers -------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame:
|
|
||||||
"""Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``)."""
|
|
||||||
ext = os.path.splitext(path)[1].lower()
|
|
||||||
if ext == ".sas7bdat":
|
|
||||||
df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
|
|
||||||
elif ext in {".xpt", ".xport"}:
|
|
||||||
df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported SAS extension: {ext}")
|
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
# -- Text readers ------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _read_text_head(
|
|
||||||
path: str,
|
|
||||||
row_count: int = 10,
|
|
||||||
delimiter: str = ",",
|
|
||||||
encoding: str = "utf-8",
|
|
||||||
quotechar: str = '"',
|
|
||||||
) -> pd.DataFrame:
|
|
||||||
"""Read the first *row_count* rows of a delimited text file.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
path : str
|
|
||||||
Path to the ``.csv``, ``.tsv``, or ``.txt`` file.
|
|
||||||
row_count : int, optional
|
|
||||||
Number of data rows to read (default ``10``).
|
|
||||||
delimiter : str, optional
|
|
||||||
Column delimiter (default ``","``). For ``.tsv`` files the caller
|
|
||||||
should pass ``"\\t"``.
|
|
||||||
encoding : str, optional
|
|
||||||
File encoding (default ``"utf-8"``).
|
|
||||||
quotechar : str, optional
|
|
||||||
Character used to quote fields (default ``'"'``).
|
|
||||||
"""
|
|
||||||
return pd.read_csv(
|
|
||||||
path,
|
|
||||||
sep=delimiter,
|
|
||||||
encoding=encoding,
|
|
||||||
quotechar=quotechar,
|
|
||||||
nrows=row_count,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -- Unified reader ----------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _read_head(
|
|
||||||
path: str,
|
|
||||||
row_count: int = 10,
|
|
||||||
delimiter: str | None = None,
|
|
||||||
encoding: str = "utf-8",
|
|
||||||
quotechar: str = '"',
|
|
||||||
) -> pd.DataFrame:
|
|
||||||
"""Read the first *row_count* rows of a supported data file.
|
|
||||||
|
|
||||||
Auto-detects the file type from its extension and delegates to the
|
|
||||||
appropriate reader. For ``.tsv`` files the delimiter defaults to tab
|
|
||||||
(``"\\t"``); for other text files it defaults to ``","``.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
path : str
|
|
||||||
Path to the data file.
|
|
||||||
row_count : int, optional
|
|
||||||
Number of data rows to read (default ``10``).
|
|
||||||
delimiter : str or None, optional
|
|
||||||
Column delimiter for text files. ``None`` means *auto-detect*
|
|
||||||
(tab for ``.tsv``, comma otherwise).
|
|
||||||
encoding : str, optional
|
|
||||||
Encoding for text files (default ``"utf-8"``).
|
|
||||||
quotechar : str, optional
|
|
||||||
Quote character for text files (default ``'"'``).
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
pandas.DataFrame
|
|
||||||
"""
|
|
||||||
ext = os.path.splitext(path)[1].lower()
|
|
||||||
|
|
||||||
if ext not in SUPPORTED_EXTENSIONS:
|
|
||||||
raise ValueError(
|
|
||||||
f"Unsupported file extension '{ext}'. "
|
|
||||||
f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if ext in SAS_EXTENSIONS:
|
|
||||||
return _read_sas_head(path, row_count=row_count)
|
|
||||||
|
|
||||||
# --- Text file path ---
|
|
||||||
if delimiter is None:
|
|
||||||
delimiter = "\t" if ext == ".tsv" else ","
|
|
||||||
|
|
||||||
return _read_text_head(
|
|
||||||
path,
|
|
||||||
row_count=row_count,
|
|
||||||
delimiter=delimiter,
|
|
||||||
encoding=encoding,
|
|
||||||
quotechar=quotechar,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -- Display -----------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _sample_values(series: pd.Series, n: int = 3) -> str:
|
|
||||||
"""Return up to *n* non-null sample values as a comma-separated string."""
|
|
||||||
non_null = series.dropna()
|
|
||||||
samples = non_null.head(n).tolist()
|
|
||||||
if not samples:
|
|
||||||
return "(all null)"
|
|
||||||
return ", ".join(repr(v) for v in samples)
|
|
||||||
|
|
||||||
|
|
||||||
def _print_summary(df: pd.DataFrame) -> None:
|
|
||||||
"""Print a nicely formatted summary table to stdout."""
|
|
||||||
# Pre-compute column data
|
|
||||||
rows = []
|
|
||||||
for col in df.columns:
|
|
||||||
rows.append((col, str(df[col].dtype), _sample_values(df[col], 3)))
|
|
||||||
|
|
||||||
# Determine column widths
|
|
||||||
hdr_name = "Column Name"
|
|
||||||
hdr_dtype = "Data Type"
|
|
||||||
hdr_samples = "Sample Values (up to 3)"
|
|
||||||
|
|
||||||
w_name = max(len(hdr_name), *(len(r[0]) for r in rows))
|
|
||||||
w_dtype = max(len(hdr_dtype), *(len(r[1]) for r in rows))
|
|
||||||
w_samples = max(len(hdr_samples), *(len(r[2]) for r in rows))
|
|
||||||
|
|
||||||
fmt = f" {{:<{w_name}}} {{:<{w_dtype}}} {{:<{w_samples}}}"
|
|
||||||
sep = f" {'-' * w_name} {'-' * w_dtype} {'-' * w_samples}"
|
|
||||||
|
|
||||||
print()
|
|
||||||
print(f" Summary of first {len(df)} row(s) ({len(df.columns)} columns)")
|
|
||||||
print(sep)
|
|
||||||
print(fmt.format(hdr_name, hdr_dtype, hdr_samples))
|
|
||||||
print(sep)
|
|
||||||
for name, dtype, samples in rows:
|
|
||||||
print(fmt.format(name, dtype, samples))
|
|
||||||
print(sep)
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# CLI
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _build_parser() -> argparse.ArgumentParser:
|
|
||||||
"""Build the argument parser for the file-viewer CLI."""
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description=(
|
|
||||||
"Download a SAS or delimited text file from S3 (or read a local "
|
|
||||||
"file) and print a column-level summary of the first N rows.\n\n"
|
|
||||||
"Supported extensions: "
|
|
||||||
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
|
|
||||||
),
|
|
||||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
||||||
)
|
|
||||||
|
|
||||||
source = parser.add_mutually_exclusive_group()
|
|
||||||
source.add_argument(
|
|
||||||
"--local",
|
|
||||||
metavar="FILE",
|
|
||||||
default=None,
|
|
||||||
help=(
|
|
||||||
"Path to a local data file to summarise (skips S3 download). "
|
|
||||||
"Supported extensions: "
|
|
||||||
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
|
|
||||||
),
|
|
||||||
)
|
|
||||||
source.add_argument(
|
|
||||||
"--s3-key",
|
|
||||||
metavar="KEY",
|
|
||||||
default=None,
|
|
||||||
help="Override the S3_KEY constant with this object key.",
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument(
|
|
||||||
"--rows",
|
|
||||||
type=int,
|
|
||||||
default=10,
|
|
||||||
metavar="N",
|
|
||||||
help="Number of rows to read (default: 10).",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Text-file-specific options
|
|
||||||
text_group = parser.add_argument_group(
|
|
||||||
"text file options",
|
|
||||||
"These options apply only to .csv / .tsv / .txt files.",
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--delimiter",
|
|
||||||
default=None,
|
|
||||||
help=(
|
|
||||||
'Column delimiter for text files (default: "," for .csv/.txt, '
|
|
||||||
'"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.'
|
|
||||||
),
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--encoding",
|
|
||||||
default="utf-8",
|
|
||||||
help='File encoding for text files (default: "utf-8").',
|
|
||||||
)
|
|
||||||
text_group.add_argument(
|
|
||||||
"--quotechar",
|
|
||||||
default='"',
|
|
||||||
help='Quote character for text files (default: \'"\').',
|
|
||||||
)
|
|
||||||
|
|
||||||
return parser
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Main
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = _build_parser()
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if args.local:
|
|
||||||
# ---- Local file mode -----------------------------------------------
|
|
||||||
local_path = args.local
|
|
||||||
ext = os.path.splitext(local_path)[1].lower()
|
|
||||||
if ext not in SUPPORTED_EXTENSIONS:
|
|
||||||
parser.error(
|
|
||||||
f"Unsupported file extension '{ext}'. "
|
|
||||||
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
|
|
||||||
)
|
|
||||||
if not os.path.isfile(local_path):
|
|
||||||
print(f"File not found: {local_path}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
else:
|
|
||||||
# ---- S3 download mode ----------------------------------------------
|
|
||||||
s3_key = args.s3_key or S3_KEY
|
|
||||||
ext = os.path.splitext(s3_key)[1].lower()
|
|
||||||
if ext not in SUPPORTED_EXTENSIONS:
|
|
||||||
parser.error(
|
|
||||||
f"Unsupported file extension '{ext}' in S3 key. "
|
|
||||||
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
os.makedirs(LOCAL_FOLDER, exist_ok=True)
|
|
||||||
local_filename = os.path.basename(s3_key)
|
|
||||||
local_path = os.path.join(LOCAL_FOLDER, local_filename)
|
|
||||||
|
|
||||||
try:
|
|
||||||
_ensure_local_copy(S3_BUCKET, s3_key, local_path)
|
|
||||||
except Exception as exc:
|
|
||||||
print(f"S3 download error: {exc}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# ---- Read & summarise --------------------------------------------------
|
|
||||||
try:
|
|
||||||
df = _read_head(
|
|
||||||
local_path,
|
|
||||||
row_count=args.rows,
|
|
||||||
delimiter=args.delimiter,
|
|
||||||
encoding=args.encoding,
|
|
||||||
quotechar=args.quotechar,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
print(f"File read error: {exc}", file=sys.stderr)
|
|
||||||
sys.exit(2)
|
|
||||||
|
|
||||||
_print_summary(df)
|
|
||||||
@ -1,757 +0,0 @@
|
|||||||
"""S3-source counterpart to ``generic_loader/load_folder.py``.
|
|
||||||
|
|
||||||
Reads a YAML config that points at an S3 bucket + prefix, lists every object
|
|
||||||
under that prefix recursively, groups objects into *clusters* using the same
|
|
||||||
explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each
|
|
||||||
cluster's files into its own subfolder under a local destination root.
|
|
||||||
|
|
||||||
Supported file types:
|
|
||||||
* SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport``
|
|
||||||
* Delimited text files: ``.txt``, ``.csv``, ``.tsv``
|
|
||||||
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
USAGE
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
1. YAML config
|
|
||||||
--------------
|
|
||||||
::
|
|
||||||
|
|
||||||
bucket: my-bucket # required
|
|
||||||
prefix: census/2020/raw/ # required; recursive scan under it
|
|
||||||
local_folder: ./downloads # required; one subfolder per cluster
|
|
||||||
aws_profile: default # optional; default boto3 chain if omitted
|
|
||||||
|
|
||||||
auto_detect: true # optional; default true
|
|
||||||
extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv
|
|
||||||
- .sas7bdat
|
|
||||||
- .csv
|
|
||||||
on_exists: skip # optional; skip | overwrite | error
|
|
||||||
concurrency: 4 # optional; default 4
|
|
||||||
|
|
||||||
clusters:
|
|
||||||
- pattern: '^group_a\\d+\\.sas7bdat$'
|
|
||||||
name: group_a
|
|
||||||
- pattern: '^group_b\\d+\\.sas7bdat$'
|
|
||||||
name: group_b
|
|
||||||
|
|
||||||
2. Command-line interface
|
|
||||||
-------------------------
|
|
||||||
::
|
|
||||||
|
|
||||||
python s3_download.py --config download_config.yaml [--dry-run]
|
|
||||||
[--overwrite] [--fail-fast]
|
|
||||||
|
|
||||||
Flags:
|
|
||||||
--config PATH Required. Path to the YAML config above.
|
|
||||||
--dry-run List discovered clusters and the objects each would
|
|
||||||
download (with sizes). No GET requests are issued
|
|
||||||
beyond the initial LIST.
|
|
||||||
--overwrite Force-redownload every matched object regardless of
|
|
||||||
local cache state. Equivalent to ``on_exists: overwrite``
|
|
||||||
for this run.
|
|
||||||
--fail-fast Abort the whole run on the first per-file download
|
|
||||||
failure. Default is to log the failure and keep going.
|
|
||||||
|
|
||||||
Exit codes:
|
|
||||||
0 - every file downloaded (or skipped) successfully (or dry-run completed)
|
|
||||||
1 - at least one file failed (details on stderr)
|
|
||||||
2 - the LIST returned no objects matching the configured extensions
|
|
||||||
|
|
||||||
3. Discovery rules
|
|
||||||
------------------
|
|
||||||
* Listing is recursive (no S3 ``Delimiter``). Regexes are matched against
|
|
||||||
the *basename* of each key (the part after the last ``/``), so a nested
|
|
||||||
object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by
|
|
||||||
``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled
|
|
||||||
identically — the basename is extracted and matched the same way.
|
|
||||||
* Explicit patterns are tried in order. A key matched by one pattern is
|
|
||||||
removed from the pool before the next pattern runs. Overlap between
|
|
||||||
patterns is flagged as an error at discovery time.
|
|
||||||
* Auto-detect groups remaining keys by ``re.sub(r'\\d+$', '', stem)`` with
|
|
||||||
any trailing ``_`` / ``-`` stripped afterward, mirroring
|
|
||||||
``load_folder.py``. Stems without trailing digits become singleton
|
|
||||||
clusters named after the stem.
|
|
||||||
* Within a cluster, files are sorted numerically by the LAST digit group
|
|
||||||
in the stem so ``_9_`` sorts before ``_40_`` regardless of zero-padding.
|
|
||||||
|
|
||||||
4. Library usage
|
|
||||||
----------------
|
|
||||||
::
|
|
||||||
|
|
||||||
from s3_download import load_download_config, list_s3_objects, \
|
|
||||||
discover_clusters, download_cluster, build_s3_client
|
|
||||||
|
|
||||||
cfg = load_download_config("download_config.yaml")
|
|
||||||
s3 = build_s3_client(cfg)
|
|
||||||
objects = list_s3_objects(s3, cfg)
|
|
||||||
for cluster in discover_clusters(cfg, objects):
|
|
||||||
download_cluster(s3, cfg, cluster)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
|
||||||
|
|
||||||
from dotenv import find_dotenv, load_dotenv
|
|
||||||
|
|
||||||
load_dotenv(find_dotenv())
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
|
|
||||||
SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
|
|
||||||
TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv")
|
|
||||||
DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS
|
|
||||||
VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error")
|
|
||||||
DEFAULT_CONCURRENCY: int = 4
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Dataclasses
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class _ExplicitPattern:
|
|
||||||
"""Parsed form of a single ``clusters[*]`` YAML entry."""
|
|
||||||
|
|
||||||
pattern: re.Pattern
|
|
||||||
raw_pattern: str
|
|
||||||
name: str
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DownloadConfig:
|
|
||||||
"""Top-level configuration parsed from YAML."""
|
|
||||||
|
|
||||||
bucket: str
|
|
||||||
prefix: str
|
|
||||||
local_folder: Path
|
|
||||||
aws_profile: Optional[str] = None
|
|
||||||
auto_detect: bool = True
|
|
||||||
extensions: Tuple[str, ...] = DEFAULT_EXTENSIONS
|
|
||||||
on_exists: str = "skip"
|
|
||||||
concurrency: int = DEFAULT_CONCURRENCY
|
|
||||||
explicit: List[_ExplicitPattern] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class S3Object:
|
|
||||||
"""A single S3 object selected from the LIST response."""
|
|
||||||
|
|
||||||
key: str
|
|
||||||
basename: str
|
|
||||||
size: int
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ClusterSpec:
|
|
||||||
"""Resolved per-cluster download settings."""
|
|
||||||
|
|
||||||
name: str
|
|
||||||
objects: List[S3Object]
|
|
||||||
source: str # "explicit" or "auto"
|
|
||||||
pattern: Optional[str] = None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Config loading
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_on_exists(value: Any, where: str) -> str:
|
|
||||||
s = str(value).lower()
|
|
||||||
if s not in VALID_ON_EXISTS:
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: on_exists={value!r} is not one of {list(VALID_ON_EXISTS)}"
|
|
||||||
)
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_extensions(raw_value: Any, where: str) -> Tuple[str, ...]:
|
|
||||||
if raw_value is None:
|
|
||||||
return DEFAULT_EXTENSIONS
|
|
||||||
if isinstance(raw_value, str):
|
|
||||||
items = [raw_value]
|
|
||||||
elif isinstance(raw_value, list):
|
|
||||||
items = list(raw_value)
|
|
||||||
else:
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: 'extensions' must be a string or list of strings."
|
|
||||||
)
|
|
||||||
out: List[str] = []
|
|
||||||
for i, item in enumerate(items):
|
|
||||||
if not isinstance(item, str) or not item.strip():
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: 'extensions[{i}]' must be a non-empty string."
|
|
||||||
)
|
|
||||||
ext = item.strip().lower()
|
|
||||||
if not ext.startswith("."):
|
|
||||||
ext = "." + ext
|
|
||||||
out.append(ext)
|
|
||||||
if len(out) != len(set(out)):
|
|
||||||
raise ValueError(f"{where}: 'extensions' contains duplicates.")
|
|
||||||
return tuple(out)
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_concurrency(raw_value: Any, where: str) -> int:
|
|
||||||
if raw_value is None:
|
|
||||||
return DEFAULT_CONCURRENCY
|
|
||||||
try:
|
|
||||||
value = int(raw_value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: 'concurrency' must be a positive integer, "
|
|
||||||
f"got {raw_value!r}"
|
|
||||||
)
|
|
||||||
if value <= 0:
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: 'concurrency' must be a positive integer, got {value}"
|
|
||||||
)
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
def load_download_config(path: Path) -> DownloadConfig:
|
|
||||||
"""Parse and validate the YAML download config at ``path``."""
|
|
||||||
path = Path(path)
|
|
||||||
with path.open("r", encoding="utf-8") as f:
|
|
||||||
raw = yaml.safe_load(f)
|
|
||||||
|
|
||||||
if not isinstance(raw, dict):
|
|
||||||
raise ValueError(
|
|
||||||
f"Config at {path} must be a YAML mapping at the top level."
|
|
||||||
)
|
|
||||||
|
|
||||||
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
|
|
||||||
# missing when neither the YAML key nor the env var is present.
|
|
||||||
required_always = ("prefix", "local_folder")
|
|
||||||
missing = [k for k in required_always if k not in raw]
|
|
||||||
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
|
|
||||||
missing.insert(0, "bucket")
|
|
||||||
if missing:
|
|
||||||
raise ValueError(
|
|
||||||
f"Config {path} missing required keys: {', '.join(missing)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
|
|
||||||
if not bucket:
|
|
||||||
bucket = os.environ.get("S3_BUCKET", "")
|
|
||||||
if not bucket:
|
|
||||||
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
|
|
||||||
|
|
||||||
prefix = str(raw["prefix"])
|
|
||||||
# Normalize: strip leading slash, ensure exactly one trailing slash unless
|
|
||||||
# the user explicitly asked for an empty prefix (whole bucket scan).
|
|
||||||
prefix = prefix.lstrip("/")
|
|
||||||
if prefix and not prefix.endswith("/"):
|
|
||||||
prefix = prefix + "/"
|
|
||||||
|
|
||||||
local_folder = Path(raw["local_folder"])
|
|
||||||
if not local_folder.is_absolute():
|
|
||||||
candidate = (path.parent / local_folder).resolve()
|
|
||||||
# Mirror load_folder's behavior: prefer the config-relative path when
|
|
||||||
# it exists, otherwise keep what the user wrote. Either way, we'll
|
|
||||||
# mkdir(parents=True) before downloading so non-existence is fine.
|
|
||||||
local_folder = candidate if candidate.parent.exists() else candidate
|
|
||||||
|
|
||||||
aws_profile = raw.get("aws_profile")
|
|
||||||
if aws_profile is not None:
|
|
||||||
aws_profile = str(aws_profile).strip() or None
|
|
||||||
if aws_profile is None:
|
|
||||||
aws_profile = os.environ.get("AWS_PROFILE") or None
|
|
||||||
|
|
||||||
auto_detect = bool(raw.get("auto_detect", True))
|
|
||||||
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")
|
|
||||||
on_exists = _validate_on_exists(
|
|
||||||
raw.get("on_exists", "skip"), f"Config {path}"
|
|
||||||
)
|
|
||||||
concurrency = _parse_concurrency(
|
|
||||||
raw.get("concurrency"), f"Config {path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
explicit: List[_ExplicitPattern] = []
|
|
||||||
seen_names: set = set()
|
|
||||||
clusters_raw = raw.get("clusters") or []
|
|
||||||
if not isinstance(clusters_raw, list):
|
|
||||||
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
|
|
||||||
for i, entry in enumerate(clusters_raw):
|
|
||||||
where = f"Config {path} clusters[{i}]"
|
|
||||||
if not isinstance(entry, dict):
|
|
||||||
raise ValueError(f"{where} must be a mapping.")
|
|
||||||
if "pattern" not in entry or "name" not in entry:
|
|
||||||
raise ValueError(f"{where} must include 'pattern' and 'name'.")
|
|
||||||
raw_pat = str(entry["pattern"])
|
|
||||||
try:
|
|
||||||
compiled = re.compile(raw_pat)
|
|
||||||
except re.error as e:
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: invalid regex {raw_pat!r}: {e}"
|
|
||||||
) from e
|
|
||||||
name = str(entry["name"]).strip()
|
|
||||||
if not name:
|
|
||||||
raise ValueError(f"{where}: 'name' must be a non-empty string.")
|
|
||||||
if name in seen_names:
|
|
||||||
raise ValueError(
|
|
||||||
f"{where}: duplicate cluster name {name!r}"
|
|
||||||
)
|
|
||||||
seen_names.add(name)
|
|
||||||
explicit.append(
|
|
||||||
_ExplicitPattern(
|
|
||||||
pattern=compiled, raw_pattern=raw_pat, name=name,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return DownloadConfig(
|
|
||||||
bucket=bucket,
|
|
||||||
prefix=prefix,
|
|
||||||
local_folder=local_folder,
|
|
||||||
aws_profile=aws_profile,
|
|
||||||
auto_detect=auto_detect,
|
|
||||||
extensions=extensions,
|
|
||||||
on_exists=on_exists,
|
|
||||||
concurrency=concurrency,
|
|
||||||
explicit=explicit,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# S3 listing
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def build_s3_client(cfg: DownloadConfig):
|
|
||||||
"""Build a boto3 S3 client honoring ``cfg.aws_profile`` if set."""
|
|
||||||
if cfg.aws_profile:
|
|
||||||
session = boto3.Session(profile_name=cfg.aws_profile)
|
|
||||||
else:
|
|
||||||
session = boto3.Session()
|
|
||||||
return session.client("s3")
|
|
||||||
|
|
||||||
|
|
||||||
def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]:
|
|
||||||
"""List all objects under ``cfg.prefix`` recursively, filtered by extension.
|
|
||||||
|
|
||||||
Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text
|
|
||||||
extensions (``.txt``, ``.csv``, ``.tsv``) — whichever are present in
|
|
||||||
``cfg.extensions``.
|
|
||||||
"""
|
|
||||||
paginator = s3_client.get_paginator("list_objects_v2")
|
|
||||||
out: List[S3Object] = []
|
|
||||||
for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix):
|
|
||||||
for entry in page.get("Contents", []):
|
|
||||||
key = entry["Key"]
|
|
||||||
if key.endswith("/"):
|
|
||||||
continue
|
|
||||||
basename = key.rsplit("/", 1)[-1]
|
|
||||||
ext = ("." + basename.rsplit(".", 1)[-1].lower()
|
|
||||||
if "." in basename else "")
|
|
||||||
if ext not in cfg.extensions:
|
|
||||||
continue
|
|
||||||
out.append(
|
|
||||||
S3Object(key=key, basename=basename, size=int(entry["Size"]))
|
|
||||||
)
|
|
||||||
out.sort(key=lambda o: o.key)
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Cluster discovery (mirrors generic_loader/load_folder.py)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
|
|
||||||
_DIGIT_GROUP_RE = re.compile(r"\d+")
|
|
||||||
|
|
||||||
|
|
||||||
def _auto_prefix(stem: str) -> str:
|
|
||||||
"""Cluster key for *stem*: strip trailing digits and any trailing _/-."""
|
|
||||||
stripped = _TRAILING_DIGIT_RE.sub("", stem)
|
|
||||||
stripped = stripped.rstrip("_-")
|
|
||||||
return stripped or stem
|
|
||||||
|
|
||||||
|
|
||||||
def _basename_stem(basename: str) -> str:
|
|
||||||
if "." in basename:
|
|
||||||
return basename.rsplit(".", 1)[0]
|
|
||||||
return basename
|
|
||||||
|
|
||||||
|
|
||||||
def _cluster_sort_key(obj: S3Object) -> Tuple[int, str]:
|
|
||||||
"""Sort key for ordering objects within a cluster by trailing digits."""
|
|
||||||
stem = _basename_stem(obj.basename)
|
|
||||||
digits = _DIGIT_GROUP_RE.findall(stem)
|
|
||||||
n = int(digits[-1]) if digits else -1
|
|
||||||
return (n, stem)
|
|
||||||
|
|
||||||
|
|
||||||
def discover_clusters(
|
|
||||||
cfg: DownloadConfig, objects: List[S3Object]
|
|
||||||
) -> List[ClusterSpec]:
|
|
||||||
"""Bucket *objects* into clusters using explicit patterns then auto-detect."""
|
|
||||||
clusters: List[ClusterSpec] = []
|
|
||||||
|
|
||||||
for i, p_i in enumerate(cfg.explicit):
|
|
||||||
for j in range(i + 1, len(cfg.explicit)):
|
|
||||||
p_j = cfg.explicit[j]
|
|
||||||
for obj in objects:
|
|
||||||
if (p_i.pattern.search(obj.basename)
|
|
||||||
and p_j.pattern.search(obj.basename)):
|
|
||||||
raise ValueError(
|
|
||||||
f"Object {obj.basename!r} matches multiple explicit "
|
|
||||||
f"patterns: {p_i.raw_pattern!r} and "
|
|
||||||
f"{p_j.raw_pattern!r}"
|
|
||||||
)
|
|
||||||
|
|
||||||
remaining = list(objects)
|
|
||||||
for patt in cfg.explicit:
|
|
||||||
matched = [o for o in remaining if patt.pattern.search(o.basename)]
|
|
||||||
if not matched:
|
|
||||||
clusters.append(
|
|
||||||
ClusterSpec(
|
|
||||||
name=patt.name,
|
|
||||||
objects=[],
|
|
||||||
source="explicit",
|
|
||||||
pattern=patt.raw_pattern,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
remaining = [o for o in remaining if o not in matched]
|
|
||||||
clusters.append(
|
|
||||||
ClusterSpec(
|
|
||||||
name=patt.name,
|
|
||||||
objects=sorted(matched, key=_cluster_sort_key),
|
|
||||||
source="explicit",
|
|
||||||
pattern=patt.raw_pattern,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if cfg.auto_detect and remaining:
|
|
||||||
buckets: Dict[str, List[S3Object]] = {}
|
|
||||||
for obj in remaining:
|
|
||||||
key = _auto_prefix(_basename_stem(obj.basename))
|
|
||||||
buckets.setdefault(key, []).append(obj)
|
|
||||||
for key in sorted(buckets):
|
|
||||||
clusters.append(
|
|
||||||
ClusterSpec(
|
|
||||||
name=key,
|
|
||||||
objects=sorted(buckets[key], key=_cluster_sort_key),
|
|
||||||
source="auto",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return clusters
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Download
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _local_path(
|
|
||||||
cfg: DownloadConfig,
|
|
||||||
cluster: ClusterSpec,
|
|
||||||
obj: S3Object,
|
|
||||||
basename_collisions: set,
|
|
||||||
) -> Path:
|
|
||||||
"""Resolve the on-disk destination path for *obj*.
|
|
||||||
|
|
||||||
Falls back to a key-derived filename when two objects in the same cluster
|
|
||||||
share a basename (possible under recursive scan).
|
|
||||||
"""
|
|
||||||
cluster_dir = cfg.local_folder / cluster.name
|
|
||||||
if obj.basename in basename_collisions:
|
|
||||||
safe = obj.key.replace("/", "__")
|
|
||||||
return cluster_dir / safe
|
|
||||||
return cluster_dir / obj.basename
|
|
||||||
|
|
||||||
|
|
||||||
def _basename_collisions(cluster: ClusterSpec) -> set:
|
|
||||||
"""Return the set of basenames that appear more than once in *cluster*."""
|
|
||||||
seen: Dict[str, int] = {}
|
|
||||||
for obj in cluster.objects:
|
|
||||||
seen[obj.basename] = seen.get(obj.basename, 0) + 1
|
|
||||||
return {name for name, count in seen.items() if count > 1}
|
|
||||||
|
|
||||||
|
|
||||||
def _decide_action(
|
|
||||||
local_path: Path, obj: S3Object, on_exists: str
|
|
||||||
) -> Tuple[str, Optional[str]]:
|
|
||||||
"""Return ``(action, message)`` where action is 'download' or 'skip'."""
|
|
||||||
if on_exists == "overwrite":
|
|
||||||
return ("download", None)
|
|
||||||
if not local_path.exists():
|
|
||||||
return ("download", None)
|
|
||||||
local_size = local_path.stat().st_size
|
|
||||||
if local_size == obj.size:
|
|
||||||
return (
|
|
||||||
"skip",
|
|
||||||
f" skip {obj.key} -> {local_path} (size {local_size} matches)",
|
|
||||||
)
|
|
||||||
if on_exists == "error":
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Local file {local_path} exists with size {local_size} but S3 "
|
|
||||||
f"object s3://{obj.key} has size {obj.size} (on_exists=error)"
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
"download",
|
|
||||||
f" re-download {obj.key} -> {local_path} "
|
|
||||||
f"(local size {local_size} != S3 {obj.size})",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def download_cluster(
|
|
||||||
s3_client,
|
|
||||||
cfg: DownloadConfig,
|
|
||||||
cluster: ClusterSpec,
|
|
||||||
*,
|
|
||||||
on_exists_override: Optional[str] = None,
|
|
||||||
fail_fast: bool = False,
|
|
||||||
) -> Tuple[int, int, int, List[Tuple[str, Exception]]]:
|
|
||||||
"""Download every object in *cluster* into ``cfg.local_folder/cluster.name``.
|
|
||||||
|
|
||||||
Returns ``(downloaded, skipped, bytes_downloaded, failures)`` where
|
|
||||||
*failures* is a list of ``(key, exception)`` tuples.
|
|
||||||
"""
|
|
||||||
if not cluster.objects:
|
|
||||||
return (0, 0, 0, [])
|
|
||||||
|
|
||||||
on_exists = on_exists_override or cfg.on_exists
|
|
||||||
cluster_dir = cfg.local_folder / cluster.name
|
|
||||||
cluster_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
collisions = _basename_collisions(cluster)
|
|
||||||
|
|
||||||
plans: List[Tuple[S3Object, Path, str]] = []
|
|
||||||
skipped = 0
|
|
||||||
for obj in cluster.objects:
|
|
||||||
local_path = _local_path(cfg, cluster, obj, collisions)
|
|
||||||
action, message = _decide_action(local_path, obj, on_exists)
|
|
||||||
if message:
|
|
||||||
print(message, file=sys.stderr)
|
|
||||||
if action == "skip":
|
|
||||||
skipped += 1
|
|
||||||
continue
|
|
||||||
plans.append((obj, local_path, action))
|
|
||||||
|
|
||||||
downloaded = 0
|
|
||||||
bytes_downloaded = 0
|
|
||||||
failures: List[Tuple[str, Exception]] = []
|
|
||||||
|
|
||||||
if not plans:
|
|
||||||
return (0, skipped, 0, failures)
|
|
||||||
|
|
||||||
def _do_one(item):
|
|
||||||
obj, local_path, _action = item
|
|
||||||
local_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
s3_client.download_file(cfg.bucket, obj.key, str(local_path))
|
|
||||||
return obj
|
|
||||||
|
|
||||||
if cfg.concurrency <= 1 or len(plans) == 1:
|
|
||||||
for plan in plans:
|
|
||||||
obj = plan[0]
|
|
||||||
try:
|
|
||||||
_do_one(plan)
|
|
||||||
downloaded += 1
|
|
||||||
bytes_downloaded += obj.size
|
|
||||||
print(
|
|
||||||
f" ok {obj.key} -> {plan[1]} ({obj.size:,} bytes)",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
failures.append((obj.key, exc))
|
|
||||||
print(
|
|
||||||
f" FAIL {obj.key}: {exc}", file=sys.stderr,
|
|
||||||
)
|
|
||||||
if fail_fast:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
with ThreadPoolExecutor(max_workers=cfg.concurrency) as pool:
|
|
||||||
future_to_plan = {pool.submit(_do_one, p): p for p in plans}
|
|
||||||
for fut in as_completed(future_to_plan):
|
|
||||||
plan = future_to_plan[fut]
|
|
||||||
obj = plan[0]
|
|
||||||
try:
|
|
||||||
fut.result()
|
|
||||||
downloaded += 1
|
|
||||||
bytes_downloaded += obj.size
|
|
||||||
print(
|
|
||||||
f" ok {obj.key} -> {plan[1]} "
|
|
||||||
f"({obj.size:,} bytes)",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
failures.append((obj.key, exc))
|
|
||||||
print(
|
|
||||||
f" FAIL {obj.key}: {exc}", file=sys.stderr,
|
|
||||||
)
|
|
||||||
if fail_fast:
|
|
||||||
for other in future_to_plan:
|
|
||||||
other.cancel()
|
|
||||||
break
|
|
||||||
|
|
||||||
return (downloaded, skipped, bytes_downloaded, failures)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# CLI
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _build_argparser() -> argparse.ArgumentParser:
|
|
||||||
p = argparse.ArgumentParser(
|
|
||||||
description=(
|
|
||||||
"Download S3 objects (SAS data files and/or delimited text files) "
|
|
||||||
"under a prefix into a local folder, grouping objects into "
|
|
||||||
"clusters that each become one subfolder. "
|
|
||||||
"Supported extensions: "
|
|
||||||
+ ", ".join(DEFAULT_EXTENSIONS)
|
|
||||||
+ "."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
p.add_argument(
|
|
||||||
"--config", required=True, type=Path, help="Path to YAML config",
|
|
||||||
)
|
|
||||||
p.add_argument(
|
|
||||||
"--dry-run",
|
|
||||||
action="store_true",
|
|
||||||
help=(
|
|
||||||
"List discovered clusters and the objects each would download. "
|
|
||||||
"No GET requests are issued beyond the initial LIST."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
p.add_argument(
|
|
||||||
"--overwrite",
|
|
||||||
action="store_true",
|
|
||||||
help=(
|
|
||||||
"Force-redownload every matched object regardless of local "
|
|
||||||
"cache state. Equivalent to on_exists=overwrite."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
p.add_argument(
|
|
||||||
"--fail-fast",
|
|
||||||
action="store_true",
|
|
||||||
help=(
|
|
||||||
"Abort on the first per-file download failure. Default is to "
|
|
||||||
"log the failure and keep going."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
return p
|
|
||||||
|
|
||||||
|
|
||||||
def _format_bytes(n: int) -> str:
|
|
||||||
units = ("B", "KiB", "MiB", "GiB", "TiB")
|
|
||||||
size = float(n)
|
|
||||||
for unit in units:
|
|
||||||
if size < 1024 or unit == units[-1]:
|
|
||||||
return f"{size:,.1f} {unit}" if unit != "B" else f"{int(size):,} B"
|
|
||||||
size /= 1024
|
|
||||||
return f"{n} B"
|
|
||||||
|
|
||||||
|
|
||||||
def _describe_cluster(cluster: ClusterSpec) -> str:
|
|
||||||
src = cluster.source
|
|
||||||
if cluster.pattern:
|
|
||||||
src += f" pattern={cluster.pattern!r}"
|
|
||||||
if not cluster.objects:
|
|
||||||
return (
|
|
||||||
f"cluster {cluster.name!r} [{src}]\n objects: (no matching keys)"
|
|
||||||
)
|
|
||||||
total = sum(o.size for o in cluster.objects)
|
|
||||||
lines = [
|
|
||||||
f"cluster {cluster.name!r} [{src}] "
|
|
||||||
f"{len(cluster.objects)} object(s), {_format_bytes(total)}"
|
|
||||||
]
|
|
||||||
for obj in cluster.objects:
|
|
||||||
lines.append(f" - {obj.key} ({_format_bytes(obj.size)})")
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
|
|
||||||
def main(argv: Optional[List[str]] = None) -> int:
|
|
||||||
args = _build_argparser().parse_args(argv)
|
|
||||||
|
|
||||||
cfg = load_download_config(args.config)
|
|
||||||
|
|
||||||
s3 = build_s3_client(cfg)
|
|
||||||
objects = list_s3_objects(s3, cfg)
|
|
||||||
|
|
||||||
if not objects:
|
|
||||||
print(
|
|
||||||
f"error: no objects matching extensions "
|
|
||||||
f"{list(cfg.extensions)} found under "
|
|
||||||
f"s3://{cfg.bucket}/{cfg.prefix}",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
return 2
|
|
||||||
|
|
||||||
clusters = discover_clusters(cfg, objects)
|
|
||||||
loadable = [c for c in clusters if c.objects]
|
|
||||||
|
|
||||||
print(
|
|
||||||
f"discovered {len(loadable)} cluster(s) "
|
|
||||||
f"({sum(len(c.objects) for c in loadable)} object(s)) under "
|
|
||||||
f"s3://{cfg.bucket}/{cfg.prefix}:"
|
|
||||||
)
|
|
||||||
for c in clusters:
|
|
||||||
print(_describe_cluster(c))
|
|
||||||
|
|
||||||
if args.dry_run:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
cfg.local_folder.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
on_exists_override = "overwrite" if args.overwrite else None
|
|
||||||
|
|
||||||
totals: List[Tuple[str, int, int, int]] = [] # (name, dl, skip, bytes)
|
|
||||||
failures: List[Tuple[str, str, Exception]] = [] # (cluster, key, exc)
|
|
||||||
aborted = False
|
|
||||||
for cluster in loadable:
|
|
||||||
if aborted:
|
|
||||||
break
|
|
||||||
print(
|
|
||||||
f"\n>>> downloading cluster {cluster.name!r} "
|
|
||||||
f"({len(cluster.objects)} object(s))"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
dl, sk, by, fails = download_cluster(
|
|
||||||
s3, cfg, cluster,
|
|
||||||
on_exists_override=on_exists_override,
|
|
||||||
fail_fast=args.fail_fast,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
failures.append((cluster.name, "<cluster>", exc))
|
|
||||||
print(
|
|
||||||
f" !! cluster {cluster.name!r} aborted: {exc}",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
if args.fail_fast:
|
|
||||||
aborted = True
|
|
||||||
continue
|
|
||||||
totals.append((cluster.name, dl, sk, by))
|
|
||||||
for key, exc in fails:
|
|
||||||
failures.append((cluster.name, key, exc))
|
|
||||||
if fails and args.fail_fast:
|
|
||||||
aborted = True
|
|
||||||
|
|
||||||
print("\n=== summary ===")
|
|
||||||
for name, dl, sk, by in totals:
|
|
||||||
print(
|
|
||||||
f" {name}: downloaded {dl}, skipped {sk}, "
|
|
||||||
f"{_format_bytes(by)}"
|
|
||||||
)
|
|
||||||
for cname, key, exc in failures:
|
|
||||||
print(f" FAIL {cname} {key}: {exc}", file=sys.stderr)
|
|
||||||
|
|
||||||
return 1 if failures else 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
sys.exit(main())
|
|
||||||
@ -1,8 +0,0 @@
|
|||||||
# S3 Directory Explorer - Input File
|
|
||||||
# One S3 prefix per line (within the bucket configured in data_explorer.py).
|
|
||||||
# Blank lines and comments (#) are ignored.
|
|
||||||
#
|
|
||||||
# Examples:
|
|
||||||
# data/sales/
|
|
||||||
# data/inventory/
|
|
||||||
# data/archive/
|
|
||||||
@ -1,111 +0,0 @@
|
|||||||
# Example S3 download config for utils/s3_download.py.
|
|
||||||
#
|
|
||||||
# Shape mirrors what `s3_download.py` expects:
|
|
||||||
#
|
|
||||||
# python s3_download.py --config sample_s3_download_config.yaml --dry-run
|
|
||||||
# python s3_download.py --config sample_s3_download_config.yaml
|
|
||||||
#
|
|
||||||
# Relative paths (e.g. local_folder) are resolved against this config file's
|
|
||||||
# directory first, falling back to the current working directory if that
|
|
||||||
# doesn't exist.
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Required: where to read from and write to
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
bucket: my-bucket
|
|
||||||
|
|
||||||
# Listing is recursive under this prefix - no S3 Delimiter is used. A nested
|
|
||||||
# object like `census/2020/raw/nested/group_c1.sas7bdat` will be considered.
|
|
||||||
# Regex patterns below match against the object BASENAME only (the part
|
|
||||||
# after the last `/`), so subfolder location does not affect matching.
|
|
||||||
prefix: census/2020/raw/
|
|
||||||
|
|
||||||
# Root destination on disk. One subfolder per cluster is created beneath it.
|
|
||||||
# If two objects in the same cluster share a basename (possible under the
|
|
||||||
# recursive scan), the second one is renamed to a key-derived filename
|
|
||||||
# (slashes replaced with `__`) so neither file overwrites the other.
|
|
||||||
local_folder: ./downloads
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Optional: AWS credentials
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
# Named profile from ~/.aws/credentials. Omit to use the default boto3
|
|
||||||
# credential chain (env vars, instance role, SSO, etc.).
|
|
||||||
# aws_profile: default
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Optional: discovery behavior
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
# When true (default), any object not matched by an explicit pattern below is
|
|
||||||
# auto-grouped with its peers by stripping trailing digits (and any trailing
|
|
||||||
# _ / -) from the basename stem. Stems without trailing digits become their
|
|
||||||
# own singleton cluster.
|
|
||||||
#
|
|
||||||
# Auto-detect only recognizes *trailing* digit runs. If your basenames put
|
|
||||||
# the varying number in the middle of the stem (e.g. surrounded by year,
|
|
||||||
# region, and detail components), auto-detect will NOT group them - each
|
|
||||||
# object becomes its own singleton cluster. Use an explicit pattern instead;
|
|
||||||
# see the embedded-digit example near the bottom of this file.
|
|
||||||
auto_detect: true
|
|
||||||
|
|
||||||
# Object extensions to consider. Anything else under the prefix is ignored.
|
|
||||||
# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv
|
|
||||||
# extensions:
|
|
||||||
# - .sas7bdat
|
|
||||||
# - .xpt
|
|
||||||
# - .txt
|
|
||||||
# - .csv
|
|
||||||
# - .tsv
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Optional: download behavior
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
# What to do when the destination file already exists locally.
|
|
||||||
# skip - (default) if the local file's byte size matches the S3
|
|
||||||
# object's Size, reuse it. If sizes differ, re-download with a
|
|
||||||
# warning. Same byte-size cache rule used by
|
|
||||||
# utils/file_viewer.py::_ensure_local_copy.
|
|
||||||
# overwrite - always re-download.
|
|
||||||
# error - abort the run if any local file exists with a different size.
|
|
||||||
# (Equal sizes still skip.)
|
|
||||||
on_exists: skip
|
|
||||||
|
|
||||||
# Parallel download workers. boto3 clients are thread-safe. Default: 4.
|
|
||||||
# concurrency: 4
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Explicit cluster patterns
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
#
|
|
||||||
# Each pattern is matched against the S3 object BASENAME (last path segment
|
|
||||||
# of the key). Objects matched by a pattern are pulled out of the
|
|
||||||
# auto-detect pool, so explicit and auto clusters compose cleanly.
|
|
||||||
#
|
|
||||||
# `name` becomes both the subfolder name under `local_folder` and the label
|
|
||||||
# used in the discovery / summary output. Names must be unique across
|
|
||||||
# explicit clusters.
|
|
||||||
clusters:
|
|
||||||
- pattern: '^group_a\d+\.sas7bdat$'
|
|
||||||
name: group_a
|
|
||||||
|
|
||||||
# - pattern: '^group_b\d+\.sas7bdat$'
|
|
||||||
# name: group_b
|
|
||||||
|
|
||||||
# Embedded-digit example. When the varying number sits in the MIDDLE of
|
|
||||||
# the basename stem (e.g. year2020_regionA_40_detail.sas7bdat,
|
|
||||||
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
|
|
||||||
# them - each object becomes its own singleton cluster. An explicit
|
|
||||||
# pattern bucketizes them correctly. The \d+ matches any width, and
|
|
||||||
# objects within the cluster are sorted numerically by the last digit
|
|
||||||
# group in the stem, so _9_ sorts before _40_ regardless of zero-padding.
|
|
||||||
#
|
|
||||||
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
|
|
||||||
# name: year2020_regionA_detail
|
|
||||||
|
|
||||||
# Text file cluster example (when file_type: text):
|
|
||||||
# - pattern: '^data_group_a\d+\.txt$'
|
|
||||||
# name: data_group_a
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user