Compare commits

..

45 Commits

Author SHA1 Message Date
David Peterson
c3d1f72556 Add null string sentinel handling in load_sas.py for improved missing value detection
Introduced a frozenset of string literals that represent SQL NULL values, enhancing the inference and nullability detection processes. Implemented helper functions to identify null strings and unify missing value checks for CHAR/TEXT columns. Updated the _null_sentinel_mask function to replace these sentinel values with None, ensuring consistent handling across various data types during data loading. This change improves robustness in managing missing data scenarios.
2026-04-22 19:20:07 -05:00
David Peterson
998a3e282f Revert "Optimize datetime parsing in load_sas.py by implementing a sample-based format detection approach"
This reverts commit 857f696305.
2026-04-22 13:05:11 -05:00
David Peterson
857f696305 Optimize datetime parsing in load_sas.py by implementing a sample-based format detection approach
Introduced a new mechanism to sample non-null values for determining the appropriate datetime parsing strategy, significantly reducing processing time for large datasets. This change replaces the previous full row-walk method with a more efficient sampling technique, enhancing performance while maintaining robust handling of various date formats. Updated comments for clarity on the new approach.
2026-04-22 12:54:19 -05:00
David Peterson
c3fa943e77 Enhance date and datetime parsing in load_sas.py with flexible regex and fallback formats
Introduced a locale-independent month lookup and improved date parsing functions to handle various date formats, including SAS and Oracle styles. The new _parse_flexible_date and _parse_flexible_datetime functions provide robust parsing capabilities, accommodating both date-only and datetime inputs. Updated _try_date_coerce and _try_datetime_coerce to utilize these new functions, ensuring better handling of diverse date formats during data loading.
2026-04-22 12:28:19 -05:00
michael-corey
f63d684d51 moving to env file 2026-04-22 15:37:35 +00:00
David Peterson
0632e110e5 Implement parallel processing for partition discovery in load_folder.py and enhance column filtering in load_sas.py
Added support for parallel processing using ProcessPoolExecutor in the _discover_cluster_partitions function, allowing for efficient partition value discovery across multiple files. This change significantly reduces I/O overhead by reading only necessary columns during scans. Additionally, updated iter_sas_chunks and iter_text_chunks functions to accept a usecols parameter, enabling selective column parsing for improved performance during data loading. These enhancements aim to optimize resource usage and speed up the data processing pipeline.
2026-04-22 15:35:19 +00:00
michael-corey
f4b4d0e928 adding exception counter 2026-04-22 10:09:41 -05:00
michael-corey
1197846d10 adding text file support 2026-04-21 20:05:26 -05:00
David Peterson
64e7ff0b0a Enhance error reporting in load_folder.py and load_sas.py for better debugging
Updated error handling in the _worker_load_append_file function to include full tracebacks in exception messages, improving context for failures during file loading. Additionally, modified the _safe_numeric_to_datetime function to provide detailed warnings when conversion errors occur, ensuring users are informed of potential data issues. These changes aim to facilitate easier debugging and enhance the robustness of the data loading process.
2026-04-21 16:56:27 -05:00
David Peterson
eff82c73ce Add all_nullable configuration option in load_folder.py and load_sas.py for flexible schema management
Introduced an `all_nullable` boolean option in both `load_folder.py` and `load_sas.py`, allowing users to specify whether all columns should be treated as nullable during schema inference. This feature addresses scenarios where the data sampling may incorrectly suggest that columns are non-nullable, preventing potential errors during data loading. Updated YAML configuration files to include examples of this new option, enhancing usability and providing clearer documentation for users.
2026-04-21 16:48:37 -05:00
David Peterson
c283b42876 Add safe numeric to datetime conversion in load_sas.py to handle edge cases
Implemented the _safe_numeric_to_datetime function to convert numeric SAS-epoch series to datetime64[ns] while managing potential overflow and non-finite values. This enhancement improves error handling during data processing by masking invalid entries before conversion, ensuring robust handling of SAS date formats in the _prepare_for_copy function.
2026-04-21 15:55:25 -05:00
David Peterson
a46f0518f6 Suppress PerformanceWarning in load_sas.py to reduce noise during processing of wide SAS files. This change filters out warnings related to DataFrame fragmentation, which are irrelevant for our pipeline as we directly convert DataFrames to pyarrow tables. 2026-04-21 13:40:38 -05:00
David Peterson
969a442775 Refactor numeric column type inference in load_sas.py for improved data handling
Updated the logic for determining column types in the union_column_types function. Changed the default type from BIGINT to DOUBLE PRECISION for numeric columns without explicit format hints, ensuring better handling of both integer and float values. This adjustment prevents loading failures due to format discrepancies and maintains consistent data processing across various SAS formats.
2026-04-21 13:17:01 -05:00
David Peterson
212218fb67 Enhance error handling and abort functionality in load_folder.py for parallel file loading
Implemented an `--abort-on-first-failure` option in the `_load_remaining_files_parallel` function, allowing users to cancel all pending tasks immediately upon the first worker failure. This change improves user experience by providing real-time feedback on errors through stderr, ensuring that users are promptly informed of issues without waiting for all tasks to complete. Additionally, refined error reporting to maintain accurate summaries of successes and failures, even during interruptions.
2026-04-21 12:54:05 -05:00
David Peterson
ae65140390 Add column type overrides in load_folder.py and load_sas.py for enhanced schema control
Implemented a new feature allowing users to specify explicit column type mappings via a `column_types` configuration in both `load_folder.py` and `load_sas.py`. This addition enables users to bypass automatic type inference for specific columns, ensuring correct data types are used when loading datasets. Updated the YAML configuration files to include examples of the new `column_types` option, enhancing usability and flexibility in handling varying data formats across files.
2026-04-21 12:14:44 -05:00
David Peterson
0c5e6e31f0 Enhance memory management in load_folder.py and load_sas.py for improved performance
Added memory management optimizations in the _worker_load_append_file function to release unused memory from pyarrow's pool and trigger Python's garbage collection. Implemented explicit memory trimming using glibc's malloc_trim to ensure efficient memory usage during long-running processes. Updated the copy_dataframes function in load_sas.py to release pyarrow's memory pool between chunks, preventing high memory usage in long-lived workers. These changes aim to reduce memory footprint and improve overall performance during large dataset processing.
2026-04-21 10:46:54 -05:00
David Peterson
9afb52aecb Add --chunk-rows option to load_folder.py for customizable memory management
Introduced a new command-line argument, --chunk-rows, allowing users to specify the number of rows per chunk for pyreadstat streaming and COPY operations. This option overrides the GENERIC_LOADER_CHUNK_ROWS environment variable and auto-scaling behavior when using multiple workers. Enhanced memory management by providing detailed information on peak memory usage based on the specified chunk size, improving performance and usability during large dataset processing.
2026-04-21 10:05:21 -05:00
David Peterson
eac75cbb26 Refactor load_cluster function in load_folder.py for improved parallel file loading
Updated the load_cluster function to enhance parallel processing by committing the table creation before dispatching all files to worker processes. This change allows for more efficient handling of large datasets by reducing the serial workload and ensuring schema compatibility checks can access the committed table. The logic for streaming files has been clarified, maintaining progress tracking throughout the loading process.
2026-04-21 08:31:48 -05:00
David Peterson
1265489276 Enhance date and timestamp handling in _prepare_for_copy function in load_sas.py
Added support for numeric date and datetime conversions from SAS formats. Implemented logic to handle float64 representations of dates (days since 1960-01-01) and datetimes (seconds since 1960-01-01), ensuring proper parsing and preventing errors during data copying to Postgres. This enhancement improves compatibility with various SAS date formats.
2026-04-21 08:16:17 -05:00
David Peterson
2dd247b067 Add --no-prescan option to load_folder.py for skipping metadata scan
Introduced a new command-line argument, --no-prescan, allowing users to bypass the per-file metadata scan during the loading process. This enhancement is particularly useful for large folders where the pre-scan may be time-consuming. The progress bar will still display rows loaded, rate, and elapsed time, but without an estimated time of arrival (ETA) for completion. Updated the main function to handle this new option and adjusted the progress tracking accordingly.
2026-04-21 08:12:39 -05:00
David Peterson
052fb0e087 Refactor pre-scan process in load_folder.py to utilize ThreadPoolExecutor for improved performance
Updated the main function to replace sequential file processing with a threaded approach using ThreadPoolExecutor. This change enhances the efficiency of reading row counts from SAS files, particularly for large datasets, by allowing concurrent I/O operations. Added progress tracking with tqdm for better user feedback during the pre-scan phase.
2026-04-20 22:43:02 -05:00
David Peterson
fe7dc4d5a1 Enhance load_cluster function for parallel processing and progress tracking
Refactored the load_cluster function in load_folder.py to support parallel file loading using ProcessPoolExecutor, improving performance during the append phase. Added workers parameter for controlling parallelism and integrated a progress_queue for real-time progress updates. Introduced read_sas_metadata function in load_sas.py to efficiently read metadata from SAS files, optimizing the pre-scan process for global progress tracking.
2026-04-20 22:02:55 -05:00
David Peterson
96f2d6fe79 Update requirements and enhance SAS file processing with progress tracking
Updated the pyarrow version in requirements.txt to improve compatibility. Enhanced the _infer_cluster_schema and _stream_file functions in load_folder.py and load_sas.py to return total row counts for better progress tracking during data streaming. Integrated tqdm for visual feedback on row processing, improving user experience during large data loads.
2026-04-20 21:44:49 -05:00
David Peterson
7beb44ac4d Add pyarrow dependency and optimize DataFrame serialization in load_sas.py
Included pyarrow as a new dependency in requirements.txt for improved CSV serialization performance. Refactored the _prepare_for_copy function to utilize vectorized operations for date and timestamp conversions, reducing CPU overhead. Introduced a new _serialize_chunk_csv function leveraging pyarrow for faster CSV writing, enhancing efficiency during data copying to Postgres.
2026-04-20 21:32:56 -05:00
David Peterson
5e347f50ef Add widening compatibility checks in load_sas.py for type inference
Introduced a new set of widening compatible type pairs to allow for accepting narrower inferred types when they fit within wider target types during schema compatibility checks. This change enhances the type inference process by preventing unnecessary mismatches and improving handling of varying integer ranges in cluster loads. Updated warning messages to inform users of accepted type adjustments.
2026-04-20 21:08:13 -05:00
David Peterson
f84e127796 Update type inference behavior in load_sas.py to scan entire files by default
Changed the default setting for TYPE_INFERENCE_SAMPLE_ROWS to None, allowing type and nullability inference to consider all rows in a SAS file. This adjustment ensures accurate handling of null values and integer ranges, addressing issues observed in production with large datasets. Updated documentation to reflect the implications of this change and the risks associated with using an integer cap for sampling.
2026-04-20 20:43:27 -05:00
David Peterson
a94ab68f4d Refine partition name patterns in sas_profiler.py
Updated the regular expression for partition name patterns to improve matching accuracy for state-related columns. The new pattern captures variations like `state`, `state_code`, and `statecode` while avoiding false positives from unrelated terms. This change enhances the precision of partition candidate selection.
2026-04-20 19:27:01 -05:00
David Peterson
4fc85081c8 Enhance SAS profiling performance in sas_profiler.py
Added a new constant for profiling chunk size to optimize memory usage during profiling operations. Refactored the update method in the _ColumnStats class to improve efficiency in handling missing values and calculating statistics for numeric and string data types. This update includes vectorized operations for better performance and clarity in the implementation.
2026-04-20 19:03:40 -05:00
David Peterson
5449a25b44 Refactor partition candidate logic in sas_profiler.py
Updated the partition candidate selection process to restrict candidates to columns matching specific name patterns, improving accuracy and reducing noise. Removed outdated distinct value constraints and clarified documentation for partitioning behavior. Enhanced handling of pre-sharded columns and refined the classification logic for better performance.
2026-04-20 18:49:23 -05:00
David Peterson
b3b968edf2 Add openpyxl dependency to requirements.txt for Excel file handling 2026-04-20 18:38:24 -05:00
David Peterson
f1af1136dc Add standalone SAS profiling utility
Introduced a new script `sas_profiler.py` that profiles local SAS files and generates an Excel report with recommendations for drops, partitions, and indexes, along with type-inference warnings. The utility supports command-line overrides for configuration and is compatible with Python 3.10+. This addition enhances the existing tools for SAS file management.
2026-04-20 18:38:01 -05:00
michael-corey
e48038f3c6 updating for sas 2026-04-20 16:30:35 -05:00
michael-corey
2390ce1e0c adding explorer 2026-04-20 16:27:54 -05:00
David Peterson
384103f489 Update pyreadstat version constraint in requirements.txt to allow for version 2.0 2026-04-20 14:10:08 -05:00
David Peterson
03b97999dc Add S3 download utility and example configuration
Introduced a new script `s3_download.py` for downloading files from S3 based on a YAML configuration. The script supports recursive listing, file clustering, and customizable download behavior. Also added a sample configuration file `sample_s3_download_config.yaml` to demonstrate usage.
2026-04-20 13:14:42 -05:00
David Peterson
b78f6d648f Enhance file clustering by implementing numeric sorting for last digit groups in stems and updating documentation for embedded-digit handling in auto-detection. 2026-04-20 11:48:22 -05:00
michael-corey
b3d7a9d440 adding index field 2026-04-20 10:18:09 -05:00
michael-corey
0d955eeab1 adding partition flag 2026-04-20 09:56:00 -05:00
michael-corey
e39eb47a90 altering such that commit is by batch 2026-04-20 08:38:38 -05:00
michael-corey
508cc974ea adding local check 2026-04-20 08:25:27 -05:00
michael-corey
2d95711d9d Updating python reference 2026-04-18 13:43:29 -05:00
michael-corey
f1e99d887d altering invalid arguments 2026-04-18 13:41:54 -05:00
michael-corey
f101eacffd Merging main 2026-04-18 13:39:37 -05:00
michael-corey
edb9146682 moving files 2026-04-18 13:35:32 -05:00
michael-corey
6b12ab969b adding file_viewer 2026-04-18 11:19:38 -05:00
16 changed files with 7264 additions and 206 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
/.venv
/samples
.env
!.env.example
__pycache__/
venv/
*/__pycache__/

View File

@ -3,3 +3,6 @@ PGPORT=5432
PGUSER=
PGPASSWORD=
PGDATABASE=
S3_BUCKET=my-bucket
AWS_PROFILE=default

View File

@ -1,5 +0,0 @@
/.venv
/samples
/.env
/__pycache__
/venv

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -15,3 +15,64 @@ tablename: kitchensink
# What to do if the target table already exists: fail | replace | append
# Defaults to fail.
if_exists: append
# file_type: Type of data file to load. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# partition_by: Partition the table by unique values of these columns.
# Columns are applied in cascading order (first column = top-level partition).
# Requires if_exists: replace or fail (not append for initial creation).
# Single field:
# partition_by: state
# Multiple fields (cascading):
# partition_by:
# - state
# - zip
#
# max_partitions: Warning threshold for total partition count (default: 10000).
# If the number of partitions exceeds this, a warning is logged but loading continues.
# max_partitions: 10000
# indexes: Create B-tree indexes on these columns after data loading.
# Indexes are created with IF NOT EXISTS for safe use with append mode.
# Single column:
# indexes: state
# Multiple columns (one index per column):
# indexes:
# - state
# - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -19,8 +19,33 @@ if_exists: replace
# auto-grouped with its peers by stripping trailing digits (and any trailing
# _ / -) from the file stem. Files with no trailing digits become their own
# singleton cluster.
#
# Auto-detect only recognizes *trailing* digit runs. If your file names put
# the varying number in the middle of the stem (e.g. surrounded by year,
# region, and detail components), auto-detect will NOT group them - each
# file becomes its own singleton cluster. Use an explicit pattern instead;
# see the embedded-digit example near the bottom of this file.
auto_detect: true
# file_type: Type of data files in this folder. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files
# instead of .sas7bdat/.xpt/.xport files.
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# Folder-level column filter. Every file in every cluster passes through
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
# override these via its own `include` / `exclude` keys.
@ -31,15 +56,76 @@ auto_detect: true
# exclude:
# - ALLNULL
# Folder-level partition_by: Partition every cluster's table by unique values
# of these columns. Inherited by all clusters unless overridden per-cluster.
# Requires if_exists: replace or fail (not append for initial creation).
# Single field:
# partition_by: state
# Multiple fields (cascading):
# partition_by:
# - state
# - zip
#
# Folder-level max_partitions: Warning threshold for total partition count
# (default: 10000). Inherited by all clusters unless overridden per-cluster.
# max_partitions: 10000
# Folder-level indexes: Create B-tree indexes on these columns after data
# loading. Inherited by all clusters unless overridden per-cluster.
# Indexes are created with IF NOT EXISTS for safe use with append mode.
# Single column:
# indexes: state
# Multiple columns (one index per column):
# indexes:
# - state
# - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly.
#
# `tablename` is required. `if_exists`, `include`, and `exclude` are
# optional per-cluster overrides of the folder-level defaults above.
# `tablename` is required. `if_exists`, `include`, `exclude`, and
# `column_types` are optional per-cluster overrides of the folder-level
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
clusters:
- pattern: '^group_a\d+\.xpt$'
tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace":
@ -48,7 +134,44 @@ clusters:
# tablename: group_b
# if_exists: append
# With only the gq pattern explicit, auto_detect: true will still bucket
# group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
# Per-cluster partition_by / max_partitions override. These take precedence
# over the folder-level defaults above.
#
# - pattern: '^group_c\d+\.xpt$'
# tablename: group_c
# partition_by:
# - region
# - year
# max_partitions: 500
# Per-cluster indexes override. Takes precedence over the folder-level
# indexes default above. An explicit empty list disables indexing for
# this cluster even when the folder default has indexes.
#
# - pattern: '^group_d\d+\.xpt$'
# tablename: group_d
# indexes:
# - region
# - year
# Embedded-digit example. When the varying number sits in the MIDDLE of
# the stem (e.g. year2020_regionA_40_detail.sas7bdat,
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
# them - each file becomes its own singleton cluster. An explicit
# pattern bucketizes them correctly. The \d+ matches any width, and
# files within the cluster are sorted numerically by the last digit
# group in the stem, so _9_ sorts before _40_ regardless of zero-
# padding. Gaps in the numeric sequence (missing 3, 7, 14, ...) are
# fine - whatever files are present get loaded in numeric order.
#
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# tablename: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# tablename: data_group_a
# With only the group_a pattern explicit, auto_detect: true will still
# bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py
# for the fixture that exercises exactly this layout.

View File

@ -0,0 +1,107 @@
{
"ALLNULL": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "entirely null numeric; loader must pick a default type, typically TEXT",
"nullable": true
},
"ALLNULLC": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "entirely null character",
"nullable": true
},
"BIGINT": {
"note": "values beyond int32 range",
"nullable": true,
"postgres_type": "BIGINT"
},
"BOOLCOL": {
"acceptable_types": [
"BOOLEAN",
"SMALLINT",
"INTEGER"
],
"note": "{0,1,NaN} is genuinely ambiguous; loader's choice is a design decision",
"nullable": true
},
"CONST": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"nullable": false
},
"DATEASTR": {
"note": "stored as char in SAS; loader should coerce ISO-date strings",
"nullable": true,
"postgres_type": "DATE"
},
"DATECOL": {
"note": "positive control",
"nullable": false,
"postgres_type": "DATE"
},
"DTCOL": {
"acceptable_types": [
"TIMESTAMP",
"TIMESTAMP WITHOUT TIME ZONE"
],
"nullable": true
},
"FLOATCOL": {
"acceptable_types": [
"DOUBLE PRECISION",
"NUMERIC"
],
"nullable": true
},
"ID": {
"nullable": false,
"postgres_type": "INTEGER"
},
"INTCOL": {
"note": "positive control",
"nullable": false,
"postgres_type": "INTEGER"
},
"LONGSTR": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"nullable": true
},
"MIXED": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "heterogeneous content; loader should fall back to text",
"nullable": true
},
"NUMASSTR": {
"acceptable_types": [
"NUMERIC",
"DOUBLE PRECISION"
],
"note": "stored as char in SAS; loader should coerce numeric-looking strings",
"nullable": true
},
"STRCOL": {
"acceptable_types": [
"TEXT",
"VARCHAR"
],
"note": "positive control",
"nullable": false
},
"TIMECOL": {
"nullable": true,
"postgres_type": "TIME"
}
}

Binary file not shown.

View File

@ -1,7 +1,10 @@
pandas>=2.0,<3.0
pyreadstat>=1.2,<1.3
pyreadstat>=1.2,<2.0
numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

679
utils/data_explorer.py Normal file
View File

@ -0,0 +1,679 @@
"""Explore S3 directories and categorise them by accessibility.
Reads a text file containing one S3 prefix per line (paths within the bucket
configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then
for each prefix:
- Lists all objects recursively (via ``list_objects_v2`` paginator)
- **Only considers files matching the configured extensions** (default: all
supported extensions SAS and text). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found
- If the first file is accessible, tests ALL remaining files individually
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
tracks individual file **Exceptions** within available directories
Supported file types
--------------------
* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport``
* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv``
A directory is considered *empty* if it contains no files matching the
extension filter, even when other file types are present.
Configure the constants below (or use CLI arguments), then run::
python3 data_explorer.py [OPTIONS]
Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib.
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass, field
from typing import List, Set, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
# ---------------------------------------------------------------------------
# Dependency check
# ---------------------------------------------------------------------------
try:
import boto3 # noqa: F401
import botocore.exceptions # noqa: F401
except ImportError:
print(
"ERROR: boto3 / botocore is not installed.\n"
"Install with: pip install boto3",
file=sys.stderr,
)
sys.exit(1)
# ---------------------------------------------------------------------------
# Extension constants
# ---------------------------------------------------------------------------
SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"}
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text / CSV files."""
SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all file extensions this tool can work with."""
# ---------------------------------------------------------------------------
# Configuration defaults — edit these or override via CLI arguments
# ---------------------------------------------------------------------------
FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
"""Set of extensions to filter on (case-insensitive). Defaults to all supported."""
INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line."""
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name (all prefixes are assumed to live in this bucket)."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
"""AWS CLI profile name used for authentication."""
# Text-file reading defaults (used when downloading / previewing text files)
DEFAULT_DELIMITER: str = ","
DEFAULT_ENCODING: str = "utf-8"
DEFAULT_QUOTECHAR: str = '"'
# ---------------------------------------------------------------------------
# Auto-detection helpers
# ---------------------------------------------------------------------------
def detect_file_type(filename: str) -> str:
"""Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension.
The check is case-insensitive. For ``.tsv`` files the caller should
default the delimiter to a tab character (``'\\t'``).
Examples
--------
>>> detect_file_type("data.sas7bdat")
'sas'
>>> detect_file_type("report.CSV")
'text'
>>> detect_file_type("archive.zip")
'unknown'
"""
ext = os.path.splitext(filename)[1].lower()
if ext in SAS_EXTENSIONS:
return "sas"
if ext in TEXT_EXTENSIONS:
return "text"
return "unknown"
def default_delimiter_for(filename: str) -> str:
"""Return a sensible default delimiter for *filename*.
* ``.tsv`` ``'\\t'``
* everything else ``','``
"""
ext = os.path.splitext(filename)[1].lower()
if ext == ".tsv":
return "\t"
return ","
def matches_extensions(key: str, extensions: Set[str]) -> bool:
"""Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive)."""
key_lower = key.lower()
return any(key_lower.endswith(ext) for ext in extensions)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class AvailableDir:
"""An S3 directory that is readable."""
prefix: str
file_count: int
total_size: int # bytes
accessible_count: int = 0 # files that passed head_object
total_count: int = 0 # total .sas7bdat files found
accessible_size: int = 0 # total size of accessible files only
@dataclass
class BlockedDir:
"""An S3 directory where access was denied or an error occurred."""
prefix: str
file_count: int
error: str
@dataclass
class EmptyDir:
"""An S3 directory with no objects."""
prefix: str
@dataclass
class ExceptionFile:
"""A specific file that failed permission check within an otherwise available directory."""
prefix: str # the directory prefix
key: str # the full S3 key of the failed file
error: str # the error message
@dataclass
class Results:
"""Aggregated exploration results."""
available: List[AvailableDir] = field(default_factory=list)
blocked: List[BlockedDir] = field(default_factory=list)
empty: List[EmptyDir] = field(default_factory=list)
exceptions: List[ExceptionFile] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def read_input_file(path: str) -> List[str]:
"""Return a list of S3 prefixes from *path*, ignoring blanks and comments.
Each line is stripped and normalised so that non-empty prefixes always end
with a trailing ``/``.
"""
prefixes: List[str] = []
with open(path, encoding="utf-8") as fh:
for raw_line in fh:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
# Normalise: strip surrounding whitespace/slashes, then re-add
# a single trailing slash (unless the prefix is empty/root).
line = line.strip("/")
if line:
line += "/"
prefixes.append(line)
return prefixes
def format_size(size_bytes: int) -> str:
"""Return a human-readable size string (KB, MB, GB, TB)."""
if size_bytes < 1024:
return f"{size_bytes} B"
for unit in ("KB", "MB", "GB", "TB"):
size_bytes /= 1024.0
if size_bytes < 1024.0 or unit == "TB":
return f"{size_bytes:,.1f} {unit}"
# Fallback (should not be reached)
return f"{size_bytes:,.1f} TB"
def extensions_label(extensions: Set[str]) -> str:
"""Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``)."""
return "/".join(sorted(extensions))
def list_objects(
s3_client: "botocore.client.S3",
bucket: str,
prefix: str,
extensions: Set[str] | None = None,
) -> Tuple[List[Tuple[str, int]], int]:
"""Recursively list all objects under *prefix*.
Only objects whose key ends with one of *extensions* (case-insensitive) are
counted. All other files are silently skipped. When *extensions* is
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
Returns ``(files, total_size)`` where *files* is a list of
``(key, size)`` tuples for every matching object and *total_size* is the
sum of their sizes in bytes.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
paginator = s3_client.get_paginator("list_objects_v2")
files: List[Tuple[str, int]] = []
total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
continue
files.append((obj["Key"], obj["Size"]))
total_size += obj["Size"]
return files, total_size
def check_read_permission(
s3_client: "botocore.client.S3",
bucket: str,
key: str,
) -> str | None:
"""Try ``head_object`` on *key*. Return ``None`` on success or an error string."""
try:
s3_client.head_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
return f"{message} ({code})"
return None
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def explore_directories(
prefixes: List[str],
*,
extensions: Set[str] | None = None,
) -> Results:
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.
Parameters
----------
prefixes:
List of S3 key prefixes to explore.
extensions:
Set of file extensions to filter on. Defaults to the module-level
``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``).
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
ext_label = extensions_label(extensions)
session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3")
results = Results()
total = len(prefixes)
for idx, prefix in enumerate(prefixes, start=1):
print(
f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...",
file=sys.stderr,
)
# --- Recursive listing ------------------------------------------------
try:
files, total_size = list_objects(
s3, S3_BUCKET, prefix, extensions=extensions,
)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
results.blocked.append(
BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})")
)
continue
except Exception as exc:
results.blocked.append(
BlockedDir(prefix=prefix, file_count=0, error=str(exc))
)
continue
if not files:
results.empty.append(EmptyDir(prefix=prefix))
continue
file_count = len(files)
# --- Permission check on first file -----------------------------------
# in "/") for the head_object test. The listing is already filtered
# to the requested extensions, so any non-marker key is a valid probe.
first_key, _ = files[0]
test_key = first_key
if first_key.endswith("/") and total_size > 0:
for key, size in files:
if not (key.endswith("/") and size == 0):
test_key = key
break
error = check_read_permission(s3, S3_BUCKET, test_key)
if error is not None:
# First file blocked → entire directory is blocked
results.blocked.append(
BlockedDir(prefix=prefix, file_count=file_count, error=error)
)
continue
# --- First file accessible → check ALL remaining files ----------------
accessible_count = 1 # the first (test_key) already passed
accessible_size = 0
dir_exceptions: List[ExceptionFile] = []
# Find the size of the test_key to count it
for key, size in files:
if key == test_key:
accessible_size = size
break
# Build list of remaining files to check
remaining = [(key, size) for key, size in files if key != test_key]
if remaining:
if len(remaining) > 10:
print(
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
file=sys.stderr,
)
for key, size in remaining:
file_error = check_read_permission(s3, S3_BUCKET, key)
if file_error is None:
accessible_count += 1
accessible_size += size
else:
dir_exceptions.append(
ExceptionFile(prefix=prefix, key=key, error=file_error)
)
else:
# Only one file and it passed
accessible_size = total_size
results.available.append(
AvailableDir(
prefix=prefix,
file_count=file_count,
total_size=total_size,
accessible_count=accessible_count,
total_count=file_count,
accessible_size=accessible_size,
)
)
results.exceptions.extend(dir_exceptions)
return results
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def print_results(results: Results, *, extensions: Set[str] | None = None) -> None:
"""Print a clean, human-readable summary to stdout.
Parameters
----------
results:
The exploration results to display.
extensions:
The set of extensions that were used for filtering. Used only for
labelling in the output. Defaults to ``FILE_EXTENSIONS``.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
ext_label = extensions_label(extensions)
print()
print("=== S3 Directory Explorer Results ===")
print(f"Bucket: {S3_BUCKET}")
print(f"Extensions: {ext_label}")
# --- Available ---
print()
print(f"--- Available ({len(results.available)}) ---")
if results.available:
for d in results.available:
print(f" {d.prefix}")
print(
f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible"
f" | Total Size: {format_size(d.accessible_size)}"
)
else:
print(" (none)")
# --- Blocked ---
print()
print(f"--- Blocked ({len(results.blocked)}) ---")
if results.blocked:
for d in results.blocked:
if d.file_count:
print(f" {d.prefix}")
print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}")
else:
print(f" {d.prefix}")
print(f" Error: {d.error}")
else:
print(" (none)")
# --- Exceptions ---
print()
print(f"--- Exceptions ({len(results.exceptions)}) ---")
if results.exceptions:
for exc in results.exceptions:
print(f" {exc.key}")
print(f" Directory: {exc.prefix} | Error: {exc.error}")
else:
print(" (none)")
# --- Empty ---
print()
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
if results.empty:
for d in results.empty:
print(f" {d.prefix}")
else:
print(" (none)")
print()
# ---------------------------------------------------------------------------
# CLI argument parsing
# ---------------------------------------------------------------------------
def build_arg_parser() -> argparse.ArgumentParser:
"""Build and return the CLI argument parser.
Supports selecting file-type filters, text-file reading parameters, and
overriding the default bucket / profile / input-file settings.
"""
parser = argparse.ArgumentParser(
description=(
"Explore S3 directories and categorise them by accessibility. "
"Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text "
"files (.txt, .csv, .tsv)."
),
)
# --- File-type / extension selection ---
type_group = parser.add_argument_group("File-type selection")
type_group.add_argument(
"--file-type",
choices=["sas", "text", "all"],
default="all",
help=(
"Restrict the scan to a specific file type. "
"'sas' = .sas7bdat/.xpt/.xport only; "
"'text' = .txt/.csv/.tsv only; "
"'all' = both (default)."
),
)
type_group.add_argument(
"--extensions",
nargs="+",
metavar="EXT",
help=(
"Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). "
"Overrides --file-type when provided."
),
)
# --- Text-file reading parameters ---
text_group = parser.add_argument_group(
"Text-file parameters",
description=(
"Parameters used when reading delimited text files. These are "
"stored for downstream consumers and do not affect the S3 scan "
"itself."
),
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
"Field delimiter for text files (default: ',' for .csv/.txt, "
"'\\t' for .tsv). Use 'tab' or '\\t' for a tab character."
),
)
text_group.add_argument(
"--encoding",
default=DEFAULT_ENCODING,
help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).",
)
text_group.add_argument(
"--quotechar",
default=DEFAULT_QUOTECHAR,
help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).",
)
# --- S3 / general settings ---
s3_group = parser.add_argument_group("S3 settings")
s3_group.add_argument(
"--bucket",
default=None,
help=f"S3 bucket name (default: {S3_BUCKET}).",
)
s3_group.add_argument(
"--profile",
default=None,
help=f"AWS CLI profile name (default: {AWS_PROFILE}).",
)
s3_group.add_argument(
"--input-file",
default=None,
help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).",
)
return parser
def resolve_extensions(args: argparse.Namespace) -> Set[str]:
"""Determine the active extension set from parsed CLI *args*.
If ``--extensions`` is provided it takes precedence. Otherwise
``--file-type`` is used to select a predefined set.
"""
if args.extensions:
# Normalise: ensure each extension starts with a dot and is lowercase
exts: Set[str] = set()
for ext in args.extensions:
ext = ext.strip().lower()
if not ext.startswith("."):
ext = "." + ext
exts.add(ext)
return exts
if args.file_type == "sas":
return SAS_EXTENSIONS
if args.file_type == "text":
return TEXT_EXTENSIONS
return SUPPORTED_EXTENSIONS
def resolve_delimiter(args: argparse.Namespace) -> str:
"""Return the effective delimiter from parsed CLI *args*.
Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a
tab character on the command line without shell-escaping issues.
"""
if args.delimiter is None:
return DEFAULT_DELIMITER
raw = args.delimiter
if raw.lower() in ("tab", "\\t"):
return "\t"
return raw
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = build_arg_parser()
args = parser.parse_args()
# --- Apply CLI overrides to module-level config ---------------------------
if args.bucket:
S3_BUCKET = args.bucket
if args.profile:
AWS_PROFILE = args.profile
input_file = args.input_file if args.input_file else INPUT_FILE
active_extensions = resolve_extensions(args)
FILE_EXTENSIONS = active_extensions
delimiter = resolve_delimiter(args)
encoding = args.encoding
quotechar = args.quotechar
# --- Read input file ------------------------------------------------------
if not os.path.exists(input_file):
print(f"ERROR: Input file not found: {input_file}", file=sys.stderr)
sys.exit(1)
try:
prefixes = read_input_file(input_file)
except Exception as exc:
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
sys.exit(1)
if not prefixes:
print("No valid S3 prefixes found in the input file. Nothing to do.")
sys.exit(0)
# --- Validate AWS profile -------------------------------------------------
try:
session = boto3.Session(profile_name=AWS_PROFILE)
# Force credential resolution to catch bad profiles early
credentials = session.get_credentials()
if credentials is None:
raise RuntimeError(
f"No credentials found for AWS profile {AWS_PROFILE!r}"
)
except botocore.exceptions.ProfileNotFound as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
except Exception as exc:
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
sys.exit(1)
# --- Print active configuration -------------------------------------------
ext_label = extensions_label(active_extensions)
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
print(f"Extensions: {ext_label}", file=sys.stderr)
if active_extensions & TEXT_EXTENSIONS:
print(
f"Text opts: delimiter={delimiter!r} encoding={encoding!r} "
f"quotechar={quotechar!r}",
file=sys.stderr,
)
# --- Explore --------------------------------------------------------------
results = explore_directories(prefixes, extensions=active_extensions)
print_results(results, extensions=active_extensions)

374
utils/file_viewer.py Normal file
View File

@ -0,0 +1,374 @@
"""Standalone utility to download a SAS or delimited text file from S3 and
print a column-level summary of the first *N* rows.
Supported formats
-----------------
* **SAS** ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*)
* **Text** ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*)
Configure the four constants below **or** use the CLI arguments, then run::
python3 file_viewer.py
python3 file_viewer.py --local path/to/file.csv
python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t'
Python 3.14 compatible.
"""
from __future__ import annotations
import argparse
import os
import sys
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3
import pandas as pd
import pyreadstat
# ---------------------------------------------------------------------------
# Supported file extensions
# ---------------------------------------------------------------------------
SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"}
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text files."""
SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all supported file extensions."""
# ---------------------------------------------------------------------------
# Configuration — edit these before running (or use CLI arguments)
# ---------------------------------------------------------------------------
S3_BUCKET: str = os.environ.get("S3_BUCKET", "my-bucket")
"""S3 bucket name."""
S3_KEY: str = "path/to/file.sas7bdat"
"""Object key (path) within the bucket to a supported data file."""
LOCAL_FOLDER: str = "./downloads"
"""Local directory to download the file into."""
AWS_PROFILE: str = os.environ.get("AWS_PROFILE", "default")
"""AWS CLI profile name used for authentication."""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None:
"""Download *key* from *bucket* to *local_path*, skipping if already present.
If *local_path* exists and its size matches the S3 object's size, the
download is skipped and a message is printed.
Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`.
"""
session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3")
remote_size = s3.head_object(Bucket=bucket, Key=key)["ContentLength"]
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if local_size == remote_size:
print(
f"Local file {local_path} already matches s3://{bucket}/{key} "
f"({local_size} bytes); skipping download."
)
return
print(
f"Local file {local_path} size ({local_size} bytes) differs from "
f"S3 ({remote_size} bytes); re-downloading."
)
print(f"Downloading s3://{bucket}/{key} -> {local_path}")
s3.download_file(bucket, key, local_path)
print("Download complete.")
# -- SAS readers -------------------------------------------------------------
def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame:
"""Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``)."""
ext = os.path.splitext(path)[1].lower()
if ext == ".sas7bdat":
df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
elif ext in {".xpt", ".xport"}:
df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count)
else:
raise ValueError(f"Unsupported SAS extension: {ext}")
return df
# -- Text readers ------------------------------------------------------------
def _read_text_head(
path: str,
row_count: int = 10,
delimiter: str = ",",
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a delimited text file.
Parameters
----------
path : str
Path to the ``.csv``, ``.tsv``, or ``.txt`` file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str, optional
Column delimiter (default ``","``). For ``.tsv`` files the caller
should pass ``"\\t"``.
encoding : str, optional
File encoding (default ``"utf-8"``).
quotechar : str, optional
Character used to quote fields (default ``'"'``).
"""
return pd.read_csv(
path,
sep=delimiter,
encoding=encoding,
quotechar=quotechar,
nrows=row_count,
)
# -- Unified reader ----------------------------------------------------------
def _read_head(
path: str,
row_count: int = 10,
delimiter: str | None = None,
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a supported data file.
Auto-detects the file type from its extension and delegates to the
appropriate reader. For ``.tsv`` files the delimiter defaults to tab
(``"\\t"``); for other text files it defaults to ``","``.
Parameters
----------
path : str
Path to the data file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str or None, optional
Column delimiter for text files. ``None`` means *auto-detect*
(tab for ``.tsv``, comma otherwise).
encoding : str, optional
Encoding for text files (default ``"utf-8"``).
quotechar : str, optional
Quote character for text files (default ``'"'``).
Returns
-------
pandas.DataFrame
"""
ext = os.path.splitext(path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file extension '{ext}'. "
f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}"
)
if ext in SAS_EXTENSIONS:
return _read_sas_head(path, row_count=row_count)
# --- Text file path ---
if delimiter is None:
delimiter = "\t" if ext == ".tsv" else ","
return _read_text_head(
path,
row_count=row_count,
delimiter=delimiter,
encoding=encoding,
quotechar=quotechar,
)
# -- Display -----------------------------------------------------------------
def _sample_values(series: pd.Series, n: int = 3) -> str:
"""Return up to *n* non-null sample values as a comma-separated string."""
non_null = series.dropna()
samples = non_null.head(n).tolist()
if not samples:
return "(all null)"
return ", ".join(repr(v) for v in samples)
def _print_summary(df: pd.DataFrame) -> None:
"""Print a nicely formatted summary table to stdout."""
# Pre-compute column data
rows = []
for col in df.columns:
rows.append((col, str(df[col].dtype), _sample_values(df[col], 3)))
# Determine column widths
hdr_name = "Column Name"
hdr_dtype = "Data Type"
hdr_samples = "Sample Values (up to 3)"
w_name = max(len(hdr_name), *(len(r[0]) for r in rows))
w_dtype = max(len(hdr_dtype), *(len(r[1]) for r in rows))
w_samples = max(len(hdr_samples), *(len(r[2]) for r in rows))
fmt = f" {{:<{w_name}}} {{:<{w_dtype}}} {{:<{w_samples}}}"
sep = f" {'-' * w_name} {'-' * w_dtype} {'-' * w_samples}"
print()
print(f" Summary of first {len(df)} row(s) ({len(df.columns)} columns)")
print(sep)
print(fmt.format(hdr_name, hdr_dtype, hdr_samples))
print(sep)
for name, dtype, samples in rows:
print(fmt.format(name, dtype, samples))
print(sep)
print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for the file-viewer CLI."""
parser = argparse.ArgumentParser(
description=(
"Download a SAS or delimited text file from S3 (or read a local "
"file) and print a column-level summary of the first N rows.\n\n"
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
source = parser.add_mutually_exclusive_group()
source.add_argument(
"--local",
metavar="FILE",
default=None,
help=(
"Path to a local data file to summarise (skips S3 download). "
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
)
source.add_argument(
"--s3-key",
metavar="KEY",
default=None,
help="Override the S3_KEY constant with this object key.",
)
parser.add_argument(
"--rows",
type=int,
default=10,
metavar="N",
help="Number of rows to read (default: 10).",
)
# Text-file-specific options
text_group = parser.add_argument_group(
"text file options",
"These options apply only to .csv / .tsv / .txt files.",
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
'Column delimiter for text files (default: "," for .csv/.txt, '
'"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.'
),
)
text_group.add_argument(
"--encoding",
default="utf-8",
help='File encoding for text files (default: "utf-8").',
)
text_group.add_argument(
"--quotechar",
default='"',
help='Quote character for text files (default: \'"\').',
)
return parser
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = _build_parser()
args = parser.parse_args()
if args.local:
# ---- Local file mode -----------------------------------------------
local_path = args.local
ext = os.path.splitext(local_path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}'. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
if not os.path.isfile(local_path):
print(f"File not found: {local_path}", file=sys.stderr)
sys.exit(1)
else:
# ---- S3 download mode ----------------------------------------------
s3_key = args.s3_key or S3_KEY
ext = os.path.splitext(s3_key)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}' in S3 key. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
os.makedirs(LOCAL_FOLDER, exist_ok=True)
local_filename = os.path.basename(s3_key)
local_path = os.path.join(LOCAL_FOLDER, local_filename)
try:
_ensure_local_copy(S3_BUCKET, s3_key, local_path)
except Exception as exc:
print(f"S3 download error: {exc}", file=sys.stderr)
sys.exit(1)
# ---- Read & summarise --------------------------------------------------
try:
df = _read_head(
local_path,
row_count=args.rows,
delimiter=args.delimiter,
encoding=args.encoding,
quotechar=args.quotechar,
)
except Exception as exc:
print(f"File read error: {exc}", file=sys.stderr)
sys.exit(2)
_print_summary(df)

757
utils/s3_download.py Normal file
View File

@ -0,0 +1,757 @@
"""S3-source counterpart to ``generic_loader/load_folder.py``.
Reads a YAML config that points at an S3 bucket + prefix, lists every object
under that prefix recursively, groups objects into *clusters* using the same
explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each
cluster's files into its own subfolder under a local destination root.
Supported file types:
* SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport``
* Delimited text files: ``.txt``, ``.csv``, ``.tsv``
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
1. YAML config
--------------
::
bucket: my-bucket # required
prefix: census/2020/raw/ # required; recursive scan under it
local_folder: ./downloads # required; one subfolder per cluster
aws_profile: default # optional; default boto3 chain if omitted
auto_detect: true # optional; default true
extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv
- .sas7bdat
- .csv
on_exists: skip # optional; skip | overwrite | error
concurrency: 4 # optional; default 4
clusters:
- pattern: '^group_a\\d+\\.sas7bdat$'
name: group_a
- pattern: '^group_b\\d+\\.sas7bdat$'
name: group_b
2. Command-line interface
-------------------------
::
python s3_download.py --config download_config.yaml [--dry-run]
[--overwrite] [--fail-fast]
Flags:
--config PATH Required. Path to the YAML config above.
--dry-run List discovered clusters and the objects each would
download (with sizes). No GET requests are issued
beyond the initial LIST.
--overwrite Force-redownload every matched object regardless of
local cache state. Equivalent to ``on_exists: overwrite``
for this run.
--fail-fast Abort the whole run on the first per-file download
failure. Default is to log the failure and keep going.
Exit codes:
0 - every file downloaded (or skipped) successfully (or dry-run completed)
1 - at least one file failed (details on stderr)
2 - the LIST returned no objects matching the configured extensions
3. Discovery rules
------------------
* Listing is recursive (no S3 ``Delimiter``). Regexes are matched against
the *basename* of each key (the part after the last ``/``), so a nested
object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by
``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled
identically the basename is extracted and matched the same way.
* Explicit patterns are tried in order. A key matched by one pattern is
removed from the pool before the next pattern runs. Overlap between
patterns is flagged as an error at discovery time.
* Auto-detect groups remaining keys by ``re.sub(r'\\d+$', '', stem)`` with
any trailing ``_`` / ``-`` stripped afterward, mirroring
``load_folder.py``. Stems without trailing digits become singleton
clusters named after the stem.
* Within a cluster, files are sorted numerically by the LAST digit group
in the stem so ``_9_`` sorts before ``_40_`` regardless of zero-padding.
4. Library usage
----------------
::
from s3_download import load_download_config, list_s3_objects, \
discover_clusters, download_cluster, build_s3_client
cfg = load_download_config("download_config.yaml")
s3 = build_s3_client(cfg)
objects = list_s3_objects(s3, cfg)
for cluster in discover_clusters(cfg, objects):
download_cluster(s3, cfg, cluster)
"""
from __future__ import annotations
import argparse
import os
import re
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
import boto3
import yaml
SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv")
DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS
VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error")
DEFAULT_CONCURRENCY: int = 4
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class _ExplicitPattern:
"""Parsed form of a single ``clusters[*]`` YAML entry."""
pattern: re.Pattern
raw_pattern: str
name: str
@dataclass
class DownloadConfig:
"""Top-level configuration parsed from YAML."""
bucket: str
prefix: str
local_folder: Path
aws_profile: Optional[str] = None
auto_detect: bool = True
extensions: Tuple[str, ...] = DEFAULT_EXTENSIONS
on_exists: str = "skip"
concurrency: int = DEFAULT_CONCURRENCY
explicit: List[_ExplicitPattern] = field(default_factory=list)
@dataclass
class S3Object:
"""A single S3 object selected from the LIST response."""
key: str
basename: str
size: int
@dataclass
class ClusterSpec:
"""Resolved per-cluster download settings."""
name: str
objects: List[S3Object]
source: str # "explicit" or "auto"
pattern: Optional[str] = None
# ---------------------------------------------------------------------------
# Config loading
# ---------------------------------------------------------------------------
def _validate_on_exists(value: Any, where: str) -> str:
s = str(value).lower()
if s not in VALID_ON_EXISTS:
raise ValueError(
f"{where}: on_exists={value!r} is not one of {list(VALID_ON_EXISTS)}"
)
return s
def _parse_extensions(raw_value: Any, where: str) -> Tuple[str, ...]:
if raw_value is None:
return DEFAULT_EXTENSIONS
if isinstance(raw_value, str):
items = [raw_value]
elif isinstance(raw_value, list):
items = list(raw_value)
else:
raise ValueError(
f"{where}: 'extensions' must be a string or list of strings."
)
out: List[str] = []
for i, item in enumerate(items):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"{where}: 'extensions[{i}]' must be a non-empty string."
)
ext = item.strip().lower()
if not ext.startswith("."):
ext = "." + ext
out.append(ext)
if len(out) != len(set(out)):
raise ValueError(f"{where}: 'extensions' contains duplicates.")
return tuple(out)
def _parse_concurrency(raw_value: Any, where: str) -> int:
if raw_value is None:
return DEFAULT_CONCURRENCY
try:
value = int(raw_value)
except (TypeError, ValueError):
raise ValueError(
f"{where}: 'concurrency' must be a positive integer, "
f"got {raw_value!r}"
)
if value <= 0:
raise ValueError(
f"{where}: 'concurrency' must be a positive integer, got {value}"
)
return value
def load_download_config(path: Path) -> DownloadConfig:
"""Parse and validate the YAML download config at ``path``."""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(
f"Config at {path} must be a YAML mapping at the top level."
)
# 'bucket' can fall back to the S3_BUCKET env var, so only flag it as
# missing when neither the YAML key nor the env var is present.
required_always = ("prefix", "local_folder")
missing = [k for k in required_always if k not in raw]
if "bucket" not in raw and not os.environ.get("S3_BUCKET"):
missing.insert(0, "bucket")
if missing:
raise ValueError(
f"Config {path} missing required keys: {', '.join(missing)}"
)
bucket = str(raw["bucket"]).strip() if raw.get("bucket") else ""
if not bucket:
bucket = os.environ.get("S3_BUCKET", "")
if not bucket:
raise ValueError(f"Config {path}: 'bucket' must be a non-empty string.")
prefix = str(raw["prefix"])
# Normalize: strip leading slash, ensure exactly one trailing slash unless
# the user explicitly asked for an empty prefix (whole bucket scan).
prefix = prefix.lstrip("/")
if prefix and not prefix.endswith("/"):
prefix = prefix + "/"
local_folder = Path(raw["local_folder"])
if not local_folder.is_absolute():
candidate = (path.parent / local_folder).resolve()
# Mirror load_folder's behavior: prefer the config-relative path when
# it exists, otherwise keep what the user wrote. Either way, we'll
# mkdir(parents=True) before downloading so non-existence is fine.
local_folder = candidate if candidate.parent.exists() else candidate
aws_profile = raw.get("aws_profile")
if aws_profile is not None:
aws_profile = str(aws_profile).strip() or None
if aws_profile is None:
aws_profile = os.environ.get("AWS_PROFILE") or None
auto_detect = bool(raw.get("auto_detect", True))
extensions = _parse_extensions(raw.get("extensions"), f"Config {path}")
on_exists = _validate_on_exists(
raw.get("on_exists", "skip"), f"Config {path}"
)
concurrency = _parse_concurrency(
raw.get("concurrency"), f"Config {path}"
)
explicit: List[_ExplicitPattern] = []
seen_names: set = set()
clusters_raw = raw.get("clusters") or []
if not isinstance(clusters_raw, list):
raise ValueError(f"Config {path}: 'clusters' must be a list if present.")
for i, entry in enumerate(clusters_raw):
where = f"Config {path} clusters[{i}]"
if not isinstance(entry, dict):
raise ValueError(f"{where} must be a mapping.")
if "pattern" not in entry or "name" not in entry:
raise ValueError(f"{where} must include 'pattern' and 'name'.")
raw_pat = str(entry["pattern"])
try:
compiled = re.compile(raw_pat)
except re.error as e:
raise ValueError(
f"{where}: invalid regex {raw_pat!r}: {e}"
) from e
name = str(entry["name"]).strip()
if not name:
raise ValueError(f"{where}: 'name' must be a non-empty string.")
if name in seen_names:
raise ValueError(
f"{where}: duplicate cluster name {name!r}"
)
seen_names.add(name)
explicit.append(
_ExplicitPattern(
pattern=compiled, raw_pattern=raw_pat, name=name,
)
)
return DownloadConfig(
bucket=bucket,
prefix=prefix,
local_folder=local_folder,
aws_profile=aws_profile,
auto_detect=auto_detect,
extensions=extensions,
on_exists=on_exists,
concurrency=concurrency,
explicit=explicit,
)
# ---------------------------------------------------------------------------
# S3 listing
# ---------------------------------------------------------------------------
def build_s3_client(cfg: DownloadConfig):
"""Build a boto3 S3 client honoring ``cfg.aws_profile`` if set."""
if cfg.aws_profile:
session = boto3.Session(profile_name=cfg.aws_profile)
else:
session = boto3.Session()
return session.client("s3")
def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]:
"""List all objects under ``cfg.prefix`` recursively, filtered by extension.
Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text
extensions (``.txt``, ``.csv``, ``.tsv``) whichever are present in
``cfg.extensions``.
"""
paginator = s3_client.get_paginator("list_objects_v2")
out: List[S3Object] = []
for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix):
for entry in page.get("Contents", []):
key = entry["Key"]
if key.endswith("/"):
continue
basename = key.rsplit("/", 1)[-1]
ext = ("." + basename.rsplit(".", 1)[-1].lower()
if "." in basename else "")
if ext not in cfg.extensions:
continue
out.append(
S3Object(key=key, basename=basename, size=int(entry["Size"]))
)
out.sort(key=lambda o: o.key)
return out
# ---------------------------------------------------------------------------
# Cluster discovery (mirrors generic_loader/load_folder.py)
# ---------------------------------------------------------------------------
_TRAILING_DIGIT_RE = re.compile(r"\d+$")
_DIGIT_GROUP_RE = re.compile(r"\d+")
def _auto_prefix(stem: str) -> str:
"""Cluster key for *stem*: strip trailing digits and any trailing _/-."""
stripped = _TRAILING_DIGIT_RE.sub("", stem)
stripped = stripped.rstrip("_-")
return stripped or stem
def _basename_stem(basename: str) -> str:
if "." in basename:
return basename.rsplit(".", 1)[0]
return basename
def _cluster_sort_key(obj: S3Object) -> Tuple[int, str]:
"""Sort key for ordering objects within a cluster by trailing digits."""
stem = _basename_stem(obj.basename)
digits = _DIGIT_GROUP_RE.findall(stem)
n = int(digits[-1]) if digits else -1
return (n, stem)
def discover_clusters(
cfg: DownloadConfig, objects: List[S3Object]
) -> List[ClusterSpec]:
"""Bucket *objects* into clusters using explicit patterns then auto-detect."""
clusters: List[ClusterSpec] = []
for i, p_i in enumerate(cfg.explicit):
for j in range(i + 1, len(cfg.explicit)):
p_j = cfg.explicit[j]
for obj in objects:
if (p_i.pattern.search(obj.basename)
and p_j.pattern.search(obj.basename)):
raise ValueError(
f"Object {obj.basename!r} matches multiple explicit "
f"patterns: {p_i.raw_pattern!r} and "
f"{p_j.raw_pattern!r}"
)
remaining = list(objects)
for patt in cfg.explicit:
matched = [o for o in remaining if patt.pattern.search(o.basename)]
if not matched:
clusters.append(
ClusterSpec(
name=patt.name,
objects=[],
source="explicit",
pattern=patt.raw_pattern,
)
)
continue
remaining = [o for o in remaining if o not in matched]
clusters.append(
ClusterSpec(
name=patt.name,
objects=sorted(matched, key=_cluster_sort_key),
source="explicit",
pattern=patt.raw_pattern,
)
)
if cfg.auto_detect and remaining:
buckets: Dict[str, List[S3Object]] = {}
for obj in remaining:
key = _auto_prefix(_basename_stem(obj.basename))
buckets.setdefault(key, []).append(obj)
for key in sorted(buckets):
clusters.append(
ClusterSpec(
name=key,
objects=sorted(buckets[key], key=_cluster_sort_key),
source="auto",
)
)
return clusters
# ---------------------------------------------------------------------------
# Download
# ---------------------------------------------------------------------------
def _local_path(
cfg: DownloadConfig,
cluster: ClusterSpec,
obj: S3Object,
basename_collisions: set,
) -> Path:
"""Resolve the on-disk destination path for *obj*.
Falls back to a key-derived filename when two objects in the same cluster
share a basename (possible under recursive scan).
"""
cluster_dir = cfg.local_folder / cluster.name
if obj.basename in basename_collisions:
safe = obj.key.replace("/", "__")
return cluster_dir / safe
return cluster_dir / obj.basename
def _basename_collisions(cluster: ClusterSpec) -> set:
"""Return the set of basenames that appear more than once in *cluster*."""
seen: Dict[str, int] = {}
for obj in cluster.objects:
seen[obj.basename] = seen.get(obj.basename, 0) + 1
return {name for name, count in seen.items() if count > 1}
def _decide_action(
local_path: Path, obj: S3Object, on_exists: str
) -> Tuple[str, Optional[str]]:
"""Return ``(action, message)`` where action is 'download' or 'skip'."""
if on_exists == "overwrite":
return ("download", None)
if not local_path.exists():
return ("download", None)
local_size = local_path.stat().st_size
if local_size == obj.size:
return (
"skip",
f" skip {obj.key} -> {local_path} (size {local_size} matches)",
)
if on_exists == "error":
raise RuntimeError(
f"Local file {local_path} exists with size {local_size} but S3 "
f"object s3://{obj.key} has size {obj.size} (on_exists=error)"
)
return (
"download",
f" re-download {obj.key} -> {local_path} "
f"(local size {local_size} != S3 {obj.size})",
)
def download_cluster(
s3_client,
cfg: DownloadConfig,
cluster: ClusterSpec,
*,
on_exists_override: Optional[str] = None,
fail_fast: bool = False,
) -> Tuple[int, int, int, List[Tuple[str, Exception]]]:
"""Download every object in *cluster* into ``cfg.local_folder/cluster.name``.
Returns ``(downloaded, skipped, bytes_downloaded, failures)`` where
*failures* is a list of ``(key, exception)`` tuples.
"""
if not cluster.objects:
return (0, 0, 0, [])
on_exists = on_exists_override or cfg.on_exists
cluster_dir = cfg.local_folder / cluster.name
cluster_dir.mkdir(parents=True, exist_ok=True)
collisions = _basename_collisions(cluster)
plans: List[Tuple[S3Object, Path, str]] = []
skipped = 0
for obj in cluster.objects:
local_path = _local_path(cfg, cluster, obj, collisions)
action, message = _decide_action(local_path, obj, on_exists)
if message:
print(message, file=sys.stderr)
if action == "skip":
skipped += 1
continue
plans.append((obj, local_path, action))
downloaded = 0
bytes_downloaded = 0
failures: List[Tuple[str, Exception]] = []
if not plans:
return (0, skipped, 0, failures)
def _do_one(item):
obj, local_path, _action = item
local_path.parent.mkdir(parents=True, exist_ok=True)
s3_client.download_file(cfg.bucket, obj.key, str(local_path))
return obj
if cfg.concurrency <= 1 or len(plans) == 1:
for plan in plans:
obj = plan[0]
try:
_do_one(plan)
downloaded += 1
bytes_downloaded += obj.size
print(
f" ok {obj.key} -> {plan[1]} ({obj.size:,} bytes)",
file=sys.stderr,
)
except Exception as exc:
failures.append((obj.key, exc))
print(
f" FAIL {obj.key}: {exc}", file=sys.stderr,
)
if fail_fast:
break
else:
with ThreadPoolExecutor(max_workers=cfg.concurrency) as pool:
future_to_plan = {pool.submit(_do_one, p): p for p in plans}
for fut in as_completed(future_to_plan):
plan = future_to_plan[fut]
obj = plan[0]
try:
fut.result()
downloaded += 1
bytes_downloaded += obj.size
print(
f" ok {obj.key} -> {plan[1]} "
f"({obj.size:,} bytes)",
file=sys.stderr,
)
except Exception as exc:
failures.append((obj.key, exc))
print(
f" FAIL {obj.key}: {exc}", file=sys.stderr,
)
if fail_fast:
for other in future_to_plan:
other.cancel()
break
return (downloaded, skipped, bytes_downloaded, failures)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Download S3 objects (SAS data files and/or delimited text files) "
"under a prefix into a local folder, grouping objects into "
"clusters that each become one subfolder. "
"Supported extensions: "
+ ", ".join(DEFAULT_EXTENSIONS)
+ "."
),
)
p.add_argument(
"--config", required=True, type=Path, help="Path to YAML config",
)
p.add_argument(
"--dry-run",
action="store_true",
help=(
"List discovered clusters and the objects each would download. "
"No GET requests are issued beyond the initial LIST."
),
)
p.add_argument(
"--overwrite",
action="store_true",
help=(
"Force-redownload every matched object regardless of local "
"cache state. Equivalent to on_exists=overwrite."
),
)
p.add_argument(
"--fail-fast",
action="store_true",
help=(
"Abort on the first per-file download failure. Default is to "
"log the failure and keep going."
),
)
return p
def _format_bytes(n: int) -> str:
units = ("B", "KiB", "MiB", "GiB", "TiB")
size = float(n)
for unit in units:
if size < 1024 or unit == units[-1]:
return f"{size:,.1f} {unit}" if unit != "B" else f"{int(size):,} B"
size /= 1024
return f"{n} B"
def _describe_cluster(cluster: ClusterSpec) -> str:
src = cluster.source
if cluster.pattern:
src += f" pattern={cluster.pattern!r}"
if not cluster.objects:
return (
f"cluster {cluster.name!r} [{src}]\n objects: (no matching keys)"
)
total = sum(o.size for o in cluster.objects)
lines = [
f"cluster {cluster.name!r} [{src}] "
f"{len(cluster.objects)} object(s), {_format_bytes(total)}"
]
for obj in cluster.objects:
lines.append(f" - {obj.key} ({_format_bytes(obj.size)})")
return "\n".join(lines)
def main(argv: Optional[List[str]] = None) -> int:
args = _build_argparser().parse_args(argv)
cfg = load_download_config(args.config)
s3 = build_s3_client(cfg)
objects = list_s3_objects(s3, cfg)
if not objects:
print(
f"error: no objects matching extensions "
f"{list(cfg.extensions)} found under "
f"s3://{cfg.bucket}/{cfg.prefix}",
file=sys.stderr,
)
return 2
clusters = discover_clusters(cfg, objects)
loadable = [c for c in clusters if c.objects]
print(
f"discovered {len(loadable)} cluster(s) "
f"({sum(len(c.objects) for c in loadable)} object(s)) under "
f"s3://{cfg.bucket}/{cfg.prefix}:"
)
for c in clusters:
print(_describe_cluster(c))
if args.dry_run:
return 0
cfg.local_folder.mkdir(parents=True, exist_ok=True)
on_exists_override = "overwrite" if args.overwrite else None
totals: List[Tuple[str, int, int, int]] = [] # (name, dl, skip, bytes)
failures: List[Tuple[str, str, Exception]] = [] # (cluster, key, exc)
aborted = False
for cluster in loadable:
if aborted:
break
print(
f"\n>>> downloading cluster {cluster.name!r} "
f"({len(cluster.objects)} object(s))"
)
try:
dl, sk, by, fails = download_cluster(
s3, cfg, cluster,
on_exists_override=on_exists_override,
fail_fast=args.fail_fast,
)
except Exception as exc:
failures.append((cluster.name, "<cluster>", exc))
print(
f" !! cluster {cluster.name!r} aborted: {exc}",
file=sys.stderr,
)
if args.fail_fast:
aborted = True
continue
totals.append((cluster.name, dl, sk, by))
for key, exc in fails:
failures.append((cluster.name, key, exc))
if fails and args.fail_fast:
aborted = True
print("\n=== summary ===")
for name, dl, sk, by in totals:
print(
f" {name}: downloaded {dl}, skipped {sk}, "
f"{_format_bytes(by)}"
)
for cname, key, exc in failures:
print(f" FAIL {cname} {key}: {exc}", file=sys.stderr)
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,8 @@
# S3 Directory Explorer - Input File
# One S3 prefix per line (within the bucket configured in data_explorer.py).
# Blank lines and comments (#) are ignored.
#
# Examples:
# data/sales/
# data/inventory/
# data/archive/

View File

@ -0,0 +1,111 @@
# Example S3 download config for utils/s3_download.py.
#
# Shape mirrors what `s3_download.py` expects:
#
# python s3_download.py --config sample_s3_download_config.yaml --dry-run
# python s3_download.py --config sample_s3_download_config.yaml
#
# Relative paths (e.g. local_folder) are resolved against this config file's
# directory first, falling back to the current working directory if that
# doesn't exist.
# ---------------------------------------------------------------------------
# Required: where to read from and write to
# ---------------------------------------------------------------------------
bucket: my-bucket
# Listing is recursive under this prefix - no S3 Delimiter is used. A nested
# object like `census/2020/raw/nested/group_c1.sas7bdat` will be considered.
# Regex patterns below match against the object BASENAME only (the part
# after the last `/`), so subfolder location does not affect matching.
prefix: census/2020/raw/
# Root destination on disk. One subfolder per cluster is created beneath it.
# If two objects in the same cluster share a basename (possible under the
# recursive scan), the second one is renamed to a key-derived filename
# (slashes replaced with `__`) so neither file overwrites the other.
local_folder: ./downloads
# ---------------------------------------------------------------------------
# Optional: AWS credentials
# ---------------------------------------------------------------------------
# Named profile from ~/.aws/credentials. Omit to use the default boto3
# credential chain (env vars, instance role, SSO, etc.).
# aws_profile: default
# ---------------------------------------------------------------------------
# Optional: discovery behavior
# ---------------------------------------------------------------------------
# When true (default), any object not matched by an explicit pattern below is
# auto-grouped with its peers by stripping trailing digits (and any trailing
# _ / -) from the basename stem. Stems without trailing digits become their
# own singleton cluster.
#
# Auto-detect only recognizes *trailing* digit runs. If your basenames put
# the varying number in the middle of the stem (e.g. surrounded by year,
# region, and detail components), auto-detect will NOT group them - each
# object becomes its own singleton cluster. Use an explicit pattern instead;
# see the embedded-digit example near the bottom of this file.
auto_detect: true
# Object extensions to consider. Anything else under the prefix is ignored.
# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv
# extensions:
# - .sas7bdat
# - .xpt
# - .txt
# - .csv
# - .tsv
# ---------------------------------------------------------------------------
# Optional: download behavior
# ---------------------------------------------------------------------------
# What to do when the destination file already exists locally.
# skip - (default) if the local file's byte size matches the S3
# object's Size, reuse it. If sizes differ, re-download with a
# warning. Same byte-size cache rule used by
# utils/file_viewer.py::_ensure_local_copy.
# overwrite - always re-download.
# error - abort the run if any local file exists with a different size.
# (Equal sizes still skip.)
on_exists: skip
# Parallel download workers. boto3 clients are thread-safe. Default: 4.
# concurrency: 4
# ---------------------------------------------------------------------------
# Explicit cluster patterns
# ---------------------------------------------------------------------------
#
# Each pattern is matched against the S3 object BASENAME (last path segment
# of the key). Objects matched by a pattern are pulled out of the
# auto-detect pool, so explicit and auto clusters compose cleanly.
#
# `name` becomes both the subfolder name under `local_folder` and the label
# used in the discovery / summary output. Names must be unique across
# explicit clusters.
clusters:
- pattern: '^group_a\d+\.sas7bdat$'
name: group_a
# - pattern: '^group_b\d+\.sas7bdat$'
# name: group_b
# Embedded-digit example. When the varying number sits in the MIDDLE of
# the basename stem (e.g. year2020_regionA_40_detail.sas7bdat,
# year2020_regionA_41_detail.sas7bdat, ...), auto-detect will NOT group
# them - each object becomes its own singleton cluster. An explicit
# pattern bucketizes them correctly. The \d+ matches any width, and
# objects within the cluster are sorted numerically by the last digit
# group in the stem, so _9_ sorts before _40_ regardless of zero-padding.
#
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# name: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# name: data_group_a

1274
utils/sas_profiler.py Normal file

File diff suppressed because it is too large Load Diff