Compare commits

...

25 Commits

Author SHA1 Message Date
michael-corey
f3bd5f02aa Merge main into directory_explorer: combine text file support with exception tracking 2026-04-22 09:12:16 -05:00
michael-corey
1197846d10 adding text file support 2026-04-21 20:05:26 -05:00
David Peterson
64e7ff0b0a Enhance error reporting in load_folder.py and load_sas.py for better debugging
Updated error handling in the _worker_load_append_file function to include full tracebacks in exception messages, improving context for failures during file loading. Additionally, modified the _safe_numeric_to_datetime function to provide detailed warnings when conversion errors occur, ensuring users are informed of potential data issues. These changes aim to facilitate easier debugging and enhance the robustness of the data loading process.
2026-04-21 16:56:27 -05:00
David Peterson
eff82c73ce Add all_nullable configuration option in load_folder.py and load_sas.py for flexible schema management
Introduced an `all_nullable` boolean option in both `load_folder.py` and `load_sas.py`, allowing users to specify whether all columns should be treated as nullable during schema inference. This feature addresses scenarios where the data sampling may incorrectly suggest that columns are non-nullable, preventing potential errors during data loading. Updated YAML configuration files to include examples of this new option, enhancing usability and providing clearer documentation for users.
2026-04-21 16:48:37 -05:00
David Peterson
c283b42876 Add safe numeric to datetime conversion in load_sas.py to handle edge cases
Implemented the _safe_numeric_to_datetime function to convert numeric SAS-epoch series to datetime64[ns] while managing potential overflow and non-finite values. This enhancement improves error handling during data processing by masking invalid entries before conversion, ensuring robust handling of SAS date formats in the _prepare_for_copy function.
2026-04-21 15:55:25 -05:00
David Peterson
a46f0518f6 Suppress PerformanceWarning in load_sas.py to reduce noise during processing of wide SAS files. This change filters out warnings related to DataFrame fragmentation, which are irrelevant for our pipeline as we directly convert DataFrames to pyarrow tables. 2026-04-21 13:40:38 -05:00
David Peterson
969a442775 Refactor numeric column type inference in load_sas.py for improved data handling
Updated the logic for determining column types in the union_column_types function. Changed the default type from BIGINT to DOUBLE PRECISION for numeric columns without explicit format hints, ensuring better handling of both integer and float values. This adjustment prevents loading failures due to format discrepancies and maintains consistent data processing across various SAS formats.
2026-04-21 13:17:01 -05:00
David Peterson
212218fb67 Enhance error handling and abort functionality in load_folder.py for parallel file loading
Implemented an `--abort-on-first-failure` option in the `_load_remaining_files_parallel` function, allowing users to cancel all pending tasks immediately upon the first worker failure. This change improves user experience by providing real-time feedback on errors through stderr, ensuring that users are promptly informed of issues without waiting for all tasks to complete. Additionally, refined error reporting to maintain accurate summaries of successes and failures, even during interruptions.
2026-04-21 12:54:05 -05:00
David Peterson
ae65140390 Add column type overrides in load_folder.py and load_sas.py for enhanced schema control
Implemented a new feature allowing users to specify explicit column type mappings via a `column_types` configuration in both `load_folder.py` and `load_sas.py`. This addition enables users to bypass automatic type inference for specific columns, ensuring correct data types are used when loading datasets. Updated the YAML configuration files to include examples of the new `column_types` option, enhancing usability and flexibility in handling varying data formats across files.
2026-04-21 12:14:44 -05:00
David Peterson
0c5e6e31f0 Enhance memory management in load_folder.py and load_sas.py for improved performance
Added memory management optimizations in the _worker_load_append_file function to release unused memory from pyarrow's pool and trigger Python's garbage collection. Implemented explicit memory trimming using glibc's malloc_trim to ensure efficient memory usage during long-running processes. Updated the copy_dataframes function in load_sas.py to release pyarrow's memory pool between chunks, preventing high memory usage in long-lived workers. These changes aim to reduce memory footprint and improve overall performance during large dataset processing.
2026-04-21 10:46:54 -05:00
David Peterson
9afb52aecb Add --chunk-rows option to load_folder.py for customizable memory management
Introduced a new command-line argument, --chunk-rows, allowing users to specify the number of rows per chunk for pyreadstat streaming and COPY operations. This option overrides the GENERIC_LOADER_CHUNK_ROWS environment variable and auto-scaling behavior when using multiple workers. Enhanced memory management by providing detailed information on peak memory usage based on the specified chunk size, improving performance and usability during large dataset processing.
2026-04-21 10:05:21 -05:00
David Peterson
eac75cbb26 Refactor load_cluster function in load_folder.py for improved parallel file loading
Updated the load_cluster function to enhance parallel processing by committing the table creation before dispatching all files to worker processes. This change allows for more efficient handling of large datasets by reducing the serial workload and ensuring schema compatibility checks can access the committed table. The logic for streaming files has been clarified, maintaining progress tracking throughout the loading process.
2026-04-21 08:31:48 -05:00
David Peterson
1265489276 Enhance date and timestamp handling in _prepare_for_copy function in load_sas.py
Added support for numeric date and datetime conversions from SAS formats. Implemented logic to handle float64 representations of dates (days since 1960-01-01) and datetimes (seconds since 1960-01-01), ensuring proper parsing and preventing errors during data copying to Postgres. This enhancement improves compatibility with various SAS date formats.
2026-04-21 08:16:17 -05:00
David Peterson
2dd247b067 Add --no-prescan option to load_folder.py for skipping metadata scan
Introduced a new command-line argument, --no-prescan, allowing users to bypass the per-file metadata scan during the loading process. This enhancement is particularly useful for large folders where the pre-scan may be time-consuming. The progress bar will still display rows loaded, rate, and elapsed time, but without an estimated time of arrival (ETA) for completion. Updated the main function to handle this new option and adjusted the progress tracking accordingly.
2026-04-21 08:12:39 -05:00
David Peterson
052fb0e087 Refactor pre-scan process in load_folder.py to utilize ThreadPoolExecutor for improved performance
Updated the main function to replace sequential file processing with a threaded approach using ThreadPoolExecutor. This change enhances the efficiency of reading row counts from SAS files, particularly for large datasets, by allowing concurrent I/O operations. Added progress tracking with tqdm for better user feedback during the pre-scan phase.
2026-04-20 22:43:02 -05:00
David Peterson
fe7dc4d5a1 Enhance load_cluster function for parallel processing and progress tracking
Refactored the load_cluster function in load_folder.py to support parallel file loading using ProcessPoolExecutor, improving performance during the append phase. Added workers parameter for controlling parallelism and integrated a progress_queue for real-time progress updates. Introduced read_sas_metadata function in load_sas.py to efficiently read metadata from SAS files, optimizing the pre-scan process for global progress tracking.
2026-04-20 22:02:55 -05:00
David Peterson
96f2d6fe79 Update requirements and enhance SAS file processing with progress tracking
Updated the pyarrow version in requirements.txt to improve compatibility. Enhanced the _infer_cluster_schema and _stream_file functions in load_folder.py and load_sas.py to return total row counts for better progress tracking during data streaming. Integrated tqdm for visual feedback on row processing, improving user experience during large data loads.
2026-04-20 21:44:49 -05:00
David Peterson
7beb44ac4d Add pyarrow dependency and optimize DataFrame serialization in load_sas.py
Included pyarrow as a new dependency in requirements.txt for improved CSV serialization performance. Refactored the _prepare_for_copy function to utilize vectorized operations for date and timestamp conversions, reducing CPU overhead. Introduced a new _serialize_chunk_csv function leveraging pyarrow for faster CSV writing, enhancing efficiency during data copying to Postgres.
2026-04-20 21:32:56 -05:00
David Peterson
5e347f50ef Add widening compatibility checks in load_sas.py for type inference
Introduced a new set of widening compatible type pairs to allow for accepting narrower inferred types when they fit within wider target types during schema compatibility checks. This change enhances the type inference process by preventing unnecessary mismatches and improving handling of varying integer ranges in cluster loads. Updated warning messages to inform users of accepted type adjustments.
2026-04-20 21:08:13 -05:00
David Peterson
f84e127796 Update type inference behavior in load_sas.py to scan entire files by default
Changed the default setting for TYPE_INFERENCE_SAMPLE_ROWS to None, allowing type and nullability inference to consider all rows in a SAS file. This adjustment ensures accurate handling of null values and integer ranges, addressing issues observed in production with large datasets. Updated documentation to reflect the implications of this change and the risks associated with using an integer cap for sampling.
2026-04-20 20:43:27 -05:00
David Peterson
a94ab68f4d Refine partition name patterns in sas_profiler.py
Updated the regular expression for partition name patterns to improve matching accuracy for state-related columns. The new pattern captures variations like `state`, `state_code`, and `statecode` while avoiding false positives from unrelated terms. This change enhances the precision of partition candidate selection.
2026-04-20 19:27:01 -05:00
David Peterson
4fc85081c8 Enhance SAS profiling performance in sas_profiler.py
Added a new constant for profiling chunk size to optimize memory usage during profiling operations. Refactored the update method in the _ColumnStats class to improve efficiency in handling missing values and calculating statistics for numeric and string data types. This update includes vectorized operations for better performance and clarity in the implementation.
2026-04-20 19:03:40 -05:00
David Peterson
5449a25b44 Refactor partition candidate logic in sas_profiler.py
Updated the partition candidate selection process to restrict candidates to columns matching specific name patterns, improving accuracy and reducing noise. Removed outdated distinct value constraints and clarified documentation for partitioning behavior. Enhanced handling of pre-sharded columns and refined the classification logic for better performance.
2026-04-20 18:49:23 -05:00
David Peterson
b3b968edf2 Add openpyxl dependency to requirements.txt for Excel file handling 2026-04-20 18:38:24 -05:00
David Peterson
f1af1136dc Add standalone SAS profiling utility
Introduced a new script `sas_profiler.py` that profiles local SAS files and generates an Excel report with recommendations for drops, partitions, and indexes, along with type-inference warnings. The utility supports command-line overrides for configuration and is compatible with Python 3.10+. This addition enhances the existing tools for SAS file management.
2026-04-20 18:38:01 -05:00
10 changed files with 3962 additions and 215 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,23 @@ tablename: kitchensink
# Defaults to fail.
if_exists: append
# file_type: Type of data file to load. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# partition_by: Partition the table by unique values of these columns.
# Columns are applied in cascading order (first column = top-level partition).
# Requires if_exists: replace or fail (not append for initial creation).
@ -38,3 +55,24 @@ if_exists: append
# indexes:
# - state
# - zip
# column_types: Explicit {column_name: postgres_type} overrides that
# bypass automatic type inference for the listed columns. Useful when
# pyreadstat reports a column as NUM but you want it stored as TEXT
# (phone/ID columns that are conceptually strings), or when a column's
# inferred type is off for any other reason. Columns not listed here
# fall through to the normal inference path. Nullability is always
# computed from the data.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# all_nullable: If true, every column is stamped nullable in the generated
# schema; NOT NULL inference is skipped entirely. Use this when the sampler
# wrongly concludes a column has no nulls (e.g. a dense sample followed by
# rare-null data downstream) and COPY blows up mid-load on the first null
# it hits. Off by default. The CLI flag --all-nullable overrides this to
# true when set.
#
# all_nullable: false

View File

@ -27,6 +27,25 @@ if_exists: replace
# see the embedded-digit example near the bottom of this file.
auto_detect: true
# file_type: Type of data files in this folder. One of: sas | text. Default: sas.
# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat
# text - Delimited text files (.txt, .csv, .tsv) read via pandas
# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files
# instead of .sas7bdat/.xpt/.xport files.
# file_type: sas
# delimiter: Column delimiter for text files. Only used when file_type: text.
# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe),
# or any single character.
# delimiter: ","
# text_encoding: Character encoding for text files. Default: utf-8.
# Common alternatives: latin-1, cp1252, iso-8859-1.
# text_encoding: utf-8
# quotechar: Quote character for text files. Default: '"' (double quote).
# quotechar: '"'
# Folder-level column filter. Every file in every cluster passes through
# this filter. `include` and `exclude` are mutually exclusive. A cluster can
# override these via its own `include` / `exclude` keys.
@ -61,15 +80,52 @@ auto_detect: true
# - state
# - zip
# Folder-level column_types: Explicit {column_name: postgres_type} map that
# bypasses automatic type inference for the listed columns. Applied to
# every cluster unless a cluster supplies its own column_types, which are
# merged on top (cluster entries win on conflict).
#
# During --workers>1 runs the pre-scan derives a cluster-wide "auto-union"
# type per column (e.g. any file stores the column as CHAR -> TEXT; all
# NUM with any format hinting decimals -> DOUBLE PRECISION; otherwise
# BIGINT). Entries in column_types here win over that auto-union - use
# them when the auto result is wrong or when --no-prescan disables the
# auto-union and you still need to pin a column.
#
# Valid type strings are anything the CREATE TABLE DDL accepts (TEXT,
# INTEGER, BIGINT, DOUBLE PRECISION, DATE, TIMESTAMP, ...). Columns that
# don't exist in a given file are simply ignored for that file.
#
# column_types:
# RESP_PH_PREFIX_ID: TEXT
# RESP_PH_SUFFIX_ID: TEXT
# SOMELONG_ID: BIGINT
# Folder-level all_nullable: If true, every column of every cluster is
# stamped nullable in the generated schema; NOT NULL inference is skipped
# entirely. Use this when the sampler wrongly concludes a column has no
# nulls (sampled rows happened to be dense, but later files in the cluster
# carry nulls) and COPY blows up mid-load. Inherited by all clusters
# unless a cluster supplies its own all_nullable. The CLI flag
# --all-nullable overrides both this and any per-cluster setting when
# passed. Off by default.
#
# all_nullable: false
# Explicit cluster patterns. Each pattern is matched against the file
# *basename*. Files matched by a pattern are pulled out of the auto-detect
# pool, so explicit and auto clusters compose cleanly.
#
# `tablename` is required. `if_exists`, `include`, and `exclude` are
# optional per-cluster overrides of the folder-level defaults above.
# `tablename` is required. `if_exists`, `include`, `exclude`, and
# `column_types` are optional per-cluster overrides of the folder-level
# defaults above. Cluster-level column_types entries win over folder-
# level entries for the same column.
clusters:
- pattern: '^group_a\d+\.xpt$'
tablename: group_a
# column_types:
# INTCOL: TEXT
# all_nullable: true # per-cluster override of the folder-level default
# Example of an explicit override. Uncomment to force the group_b cluster to
# append instead of replace even though the folder default is "replace":
@ -111,6 +167,10 @@ clusters:
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# tablename: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# tablename: data_group_a
# With only the group_a pattern explicit, auto_detect: true will still
# bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone
# standalone.xpt into a "standalone" cluster. See generate_sample_folder.py

View File

@ -1,7 +1,10 @@
pandas>=2.0,<3.0
pyreadstat>=1.2,<2.0
numpy>=2.1,<3.0
pyarrow>=22.0,<24.0
pyyaml>=6.0,<7.0
psycopg2-binary>=2.9,<3.0
python-dotenv>=1.0,<2.0
boto3>=1.28,<2.0
openpyxl>=3.1,<4.0
tqdm>=4.66,<5.0

View File

@ -1,30 +1,39 @@
"""Explore S3 directories and categorise them by accessibility.
Reads a text file containing one S3 prefix per line (paths within the bucket
configured by the ``S3_BUCKET`` constant), then for each prefix:
configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then
for each prefix:
- Lists all objects recursively (via ``list_objects_v2`` paginator)
- **Only considers files matching the ``FILE_EXTENSION`` filter** (default
``.sas7bdat``). All other file types are ignored.
- **Only considers files matching the configured extensions** (default: all
supported extensions SAS and text). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found
- If the first file is accessible, tests ALL remaining files individually
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
tracks individual file **Exceptions** within available directories
Supported file types
--------------------
* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport``
* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv``
A directory is considered *empty* if it contains no files matching the
extension filter, even when other file types are present.
Configure the constants below, then run::
Configure the constants below (or use CLI arguments), then run::
python3 data_explorer.py
python3 data_explorer.py [OPTIONS]
Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib.
Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib.
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass, field
from typing import List, Tuple
from typing import List, Set, Tuple
# ---------------------------------------------------------------------------
# Dependency check
@ -43,11 +52,25 @@ except ImportError:
# ---------------------------------------------------------------------------
# Configuration — edit these before running
# Extension constants
# ---------------------------------------------------------------------------
FILE_EXTENSION: str = ".sas7bdat"
"""Only files whose key ends with this extension (case-insensitive) are considered."""
SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"}
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text / CSV files."""
SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all file extensions this tool can work with."""
# ---------------------------------------------------------------------------
# Configuration defaults — edit these or override via CLI arguments
# ---------------------------------------------------------------------------
FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS
"""Set of extensions to filter on (case-insensitive). Defaults to all supported."""
INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line."""
@ -58,6 +81,57 @@ S3_BUCKET: str = "my-bucket"
AWS_PROFILE: str = "default"
"""AWS CLI profile name used for authentication."""
# Text-file reading defaults (used when downloading / previewing text files)
DEFAULT_DELIMITER: str = ","
DEFAULT_ENCODING: str = "utf-8"
DEFAULT_QUOTECHAR: str = '"'
# ---------------------------------------------------------------------------
# Auto-detection helpers
# ---------------------------------------------------------------------------
def detect_file_type(filename: str) -> str:
"""Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension.
The check is case-insensitive. For ``.tsv`` files the caller should
default the delimiter to a tab character (``'\\t'``).
Examples
--------
>>> detect_file_type("data.sas7bdat")
'sas'
>>> detect_file_type("report.CSV")
'text'
>>> detect_file_type("archive.zip")
'unknown'
"""
ext = os.path.splitext(filename)[1].lower()
if ext in SAS_EXTENSIONS:
return "sas"
if ext in TEXT_EXTENSIONS:
return "text"
return "unknown"
def default_delimiter_for(filename: str) -> str:
"""Return a sensible default delimiter for *filename*.
* ``.tsv`` ``'\\t'``
* everything else ``','``
"""
ext = os.path.splitext(filename)[1].lower()
if ext == ".tsv":
return "\t"
return ","
def matches_extensions(key: str, extensions: Set[str]) -> bool:
"""Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive)."""
key_lower = key.lower()
return any(key_lower.endswith(ext) for ext in extensions)
# ---------------------------------------------------------------------------
# Data structures
@ -149,27 +223,36 @@ def format_size(size_bytes: int) -> str:
return f"{size_bytes:,.1f} TB"
def extensions_label(extensions: Set[str]) -> str:
"""Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``)."""
return "/".join(sorted(extensions))
def list_objects(
s3_client: "botocore.client.S3",
bucket: str,
prefix: str,
extensions: Set[str] | None = None,
) -> Tuple[List[Tuple[str, int]], int]:
"""Recursively list all objects under *prefix*.
Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are
counted. All other files are silently skipped.
Only objects whose key ends with one of *extensions* (case-insensitive) are
counted. All other files are silently skipped. When *extensions* is
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
Returns ``(files, total_size)`` where *files* is a list of
``(key, size)`` tuples for every matching object and *total_size* is the
sum of their sizes in bytes.
"""
ext_lower = FILE_EXTENSION.lower()
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
paginator = s3_client.get_paginator("list_objects_v2")
files: List[Tuple[str, int]] = []
total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if not obj["Key"].lower().endswith(ext_lower):
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
continue
files.append((obj["Key"], obj["Size"]))
total_size += obj["Size"]
@ -196,8 +279,26 @@ def check_read_permission(
# ---------------------------------------------------------------------------
def explore_directories(prefixes: List[str]) -> Results:
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*."""
def explore_directories(
prefixes: List[str],
*,
extensions: Set[str] | None = None,
) -> Results:
"""Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.
Parameters
----------
prefixes:
List of S3 key prefixes to explore.
extensions:
Set of file extensions to filter on. Defaults to the module-level
``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``).
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
ext_label = extensions_label(extensions)
session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3")
@ -206,13 +307,13 @@ def explore_directories(prefixes: List[str]) -> Results:
for idx, prefix in enumerate(prefixes, start=1):
print(
f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...",
f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...",
file=sys.stderr,
)
# --- Recursive listing ------------------------------------------------
try:
files, total_size = list_objects(s3, S3_BUCKET, prefix)
files, total_size = list_objects(s3, S3_BUCKET, prefix, extensions=extensions)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
@ -234,12 +335,13 @@ def explore_directories(prefixes: List[str]) -> Results:
# --- Permission check on first file -----------------------------------
# Prefer a real object over a zero-byte directory marker (key ending
# in "/") for the head_object test.
# in "/") for the head_object test. The selected key must also match
# the extension filter.
first_key, _ = files[0]
test_key = first_key
if first_key.endswith("/") and total_size > 0:
for key, size in files:
if not (key.endswith("/") and size == 0):
if not (key.endswith("/") and size == 0) and matches_extensions(key, exts_lower):
test_key = key
break
@ -268,7 +370,7 @@ def explore_directories(prefixes: List[str]) -> Results:
if remaining:
if len(remaining) > 10:
print(
f" Verifying access to {file_count} {FILE_EXTENSION} files in {prefix} ...",
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
file=sys.stderr,
)
@ -306,11 +408,25 @@ def explore_directories(prefixes: List[str]) -> Results:
# ---------------------------------------------------------------------------
def print_results(results: Results) -> None:
"""Print a clean, human-readable summary to stdout."""
def print_results(results: Results, *, extensions: Set[str] | None = None) -> None:
"""Print a clean, human-readable summary to stdout.
Parameters
----------
results:
The exploration results to display.
extensions:
The set of extensions that were used for filtering. Used only for
labelling in the output. Defaults to ``FILE_EXTENSIONS``.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
ext_label = extensions_label(extensions)
print()
print("=== S3 Directory Explorer Results ===")
print(f"Bucket: {S3_BUCKET}")
print(f"Extensions: {ext_label}")
# --- Available ---
print()
@ -319,7 +435,7 @@ def print_results(results: Results) -> None:
for d in results.available:
print(f" {d.prefix}")
print(
f" {FILE_EXTENSION} files: {d.accessible_count}/{d.total_count} accessible"
f" Matching files ({ext_label}): {d.accessible_count}/{d.total_count} accessible"
f" | Total Size: {format_size(d.accessible_size)}"
)
else:
@ -332,7 +448,7 @@ def print_results(results: Results) -> None:
for d in results.blocked:
if d.file_count:
print(f" {d.prefix}")
print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}")
print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}")
else:
print(f" {d.prefix}")
print(f" Error: {d.error}")
@ -351,7 +467,7 @@ def print_results(results: Results) -> None:
# --- Empty ---
print()
print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---")
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
if results.empty:
for d in results.empty:
print(f" {d.prefix}")
@ -361,20 +477,163 @@ def print_results(results: Results) -> None:
print()
# ---------------------------------------------------------------------------
# CLI argument parsing
# ---------------------------------------------------------------------------
def build_arg_parser() -> argparse.ArgumentParser:
"""Build and return the CLI argument parser.
Supports selecting file-type filters, text-file reading parameters, and
overriding the default bucket / profile / input-file settings.
"""
parser = argparse.ArgumentParser(
description=(
"Explore S3 directories and categorise them by accessibility. "
"Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text "
"files (.txt, .csv, .tsv)."
),
)
# --- File-type / extension selection ---
type_group = parser.add_argument_group("File-type selection")
type_group.add_argument(
"--file-type",
choices=["sas", "text", "all"],
default="all",
help=(
"Restrict the scan to a specific file type. "
"'sas' = .sas7bdat/.xpt/.xport only; "
"'text' = .txt/.csv/.tsv only; "
"'all' = both (default)."
),
)
type_group.add_argument(
"--extensions",
nargs="+",
metavar="EXT",
help=(
"Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). "
"Overrides --file-type when provided."
),
)
# --- Text-file reading parameters ---
text_group = parser.add_argument_group(
"Text-file parameters",
description=(
"Parameters used when reading delimited text files. These are "
"stored for downstream consumers and do not affect the S3 scan "
"itself."
),
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
"Field delimiter for text files (default: ',' for .csv/.txt, "
"'\\t' for .tsv). Use 'tab' or '\\t' for a tab character."
),
)
text_group.add_argument(
"--encoding",
default=DEFAULT_ENCODING,
help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).",
)
text_group.add_argument(
"--quotechar",
default=DEFAULT_QUOTECHAR,
help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).",
)
# --- S3 / general settings ---
s3_group = parser.add_argument_group("S3 settings")
s3_group.add_argument(
"--bucket",
default=None,
help=f"S3 bucket name (default: {S3_BUCKET}).",
)
s3_group.add_argument(
"--profile",
default=None,
help=f"AWS CLI profile name (default: {AWS_PROFILE}).",
)
s3_group.add_argument(
"--input-file",
default=None,
help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).",
)
return parser
def resolve_extensions(args: argparse.Namespace) -> Set[str]:
"""Determine the active extension set from parsed CLI *args*.
If ``--extensions`` is provided it takes precedence. Otherwise
``--file-type`` is used to select a predefined set.
"""
if args.extensions:
# Normalise: ensure each extension starts with a dot and is lowercase
exts: Set[str] = set()
for ext in args.extensions:
ext = ext.strip().lower()
if not ext.startswith("."):
ext = "." + ext
exts.add(ext)
return exts
if args.file_type == "sas":
return SAS_EXTENSIONS
if args.file_type == "text":
return TEXT_EXTENSIONS
return SUPPORTED_EXTENSIONS
def resolve_delimiter(args: argparse.Namespace) -> str:
"""Return the effective delimiter from parsed CLI *args*.
Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a
tab character on the command line without shell-escaping issues.
"""
if args.delimiter is None:
return DEFAULT_DELIMITER
raw = args.delimiter
if raw.lower() in ("tab", "\\t"):
return "\t"
return raw
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import os
parser = build_arg_parser()
args = parser.parse_args()
# --- Apply CLI overrides to module-level config ---------------------------
if args.bucket:
S3_BUCKET = args.bucket
if args.profile:
AWS_PROFILE = args.profile
input_file = args.input_file if args.input_file else INPUT_FILE
active_extensions = resolve_extensions(args)
FILE_EXTENSIONS = active_extensions
delimiter = resolve_delimiter(args)
encoding = args.encoding
quotechar = args.quotechar
# --- Read input file ------------------------------------------------------
if not os.path.exists(INPUT_FILE):
print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr)
if not os.path.exists(input_file):
print(f"ERROR: Input file not found: {input_file}", file=sys.stderr)
sys.exit(1)
try:
prefixes = read_input_file(INPUT_FILE)
prefixes = read_input_file(input_file)
except Exception as exc:
print(f"ERROR: Could not read input file: {exc}", file=sys.stderr)
sys.exit(1)
@ -399,7 +658,17 @@ if __name__ == "__main__":
print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr)
sys.exit(1)
# --- Explore --------------------------------------------------------------
# --- Print active configuration -------------------------------------------
ext_label = extensions_label(active_extensions)
print(f"Bucket: {S3_BUCKET}", file=sys.stderr)
results = explore_directories(prefixes)
print_results(results)
print(f"Extensions: {ext_label}", file=sys.stderr)
if active_extensions & TEXT_EXTENSIONS:
print(
f"Text opts: delimiter={delimiter!r} encoding={encoding!r} "
f"quotechar={quotechar!r}",
file=sys.stderr,
)
# --- Explore --------------------------------------------------------------
results = explore_directories(prefixes, extensions=active_extensions)
print_results(results, extensions=active_extensions)

View File

@ -1,15 +1,23 @@
"""Standalone utility to download a .sas7bdat file from S3 and print a
column-level summary of the first 10 rows.
"""Standalone utility to download a SAS or delimited text file from S3 and
print a column-level summary of the first *N* rows.
Configure the four constants below, then run::
Supported formats
-----------------
* **SAS** ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*)
* **Text** ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*)
Configure the four constants below **or** use the CLI arguments, then run::
python3 file_viewer.py
python3 file_viewer.py --local path/to/file.csv
python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t'
Python 3.14 compatible.
"""
from __future__ import annotations
import argparse
import os
import sys
@ -19,14 +27,28 @@ import pyreadstat
# ---------------------------------------------------------------------------
# Configuration — edit these before running
# Supported file extensions
# ---------------------------------------------------------------------------
SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"}
"""File extensions recognised as SAS data files."""
TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"}
"""File extensions recognised as delimited text files."""
SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS
"""Union of all supported file extensions."""
# ---------------------------------------------------------------------------
# Configuration — edit these before running (or use CLI arguments)
# ---------------------------------------------------------------------------
S3_BUCKET: str = "my-bucket"
"""S3 bucket name."""
S3_KEY: str = "path/to/file.sas7bdat"
"""Object key (path) within the bucket to the .sas7bdat file."""
"""Object key (path) within the bucket to a supported data file."""
LOCAL_FOLDER: str = "./downloads"
"""Local directory to download the file into."""
@ -45,6 +67,8 @@ def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None:
If *local_path* exists and its size matches the S3 object's size, the
download is skipped and a message is printed.
Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`.
"""
session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client("s3")
@ -69,12 +93,117 @@ def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None:
print("Download complete.")
# -- SAS readers -------------------------------------------------------------
def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame:
"""Read the first *row_count* rows of a .sas7bdat file."""
"""Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``)."""
ext = os.path.splitext(path)[1].lower()
if ext == ".sas7bdat":
df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count)
elif ext in {".xpt", ".xport"}:
df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count)
else:
raise ValueError(f"Unsupported SAS extension: {ext}")
return df
# -- Text readers ------------------------------------------------------------
def _read_text_head(
path: str,
row_count: int = 10,
delimiter: str = ",",
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a delimited text file.
Parameters
----------
path : str
Path to the ``.csv``, ``.tsv``, or ``.txt`` file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str, optional
Column delimiter (default ``","``). For ``.tsv`` files the caller
should pass ``"\\t"``.
encoding : str, optional
File encoding (default ``"utf-8"``).
quotechar : str, optional
Character used to quote fields (default ``'"'``).
"""
return pd.read_csv(
path,
sep=delimiter,
encoding=encoding,
quotechar=quotechar,
nrows=row_count,
)
# -- Unified reader ----------------------------------------------------------
def _read_head(
path: str,
row_count: int = 10,
delimiter: str | None = None,
encoding: str = "utf-8",
quotechar: str = '"',
) -> pd.DataFrame:
"""Read the first *row_count* rows of a supported data file.
Auto-detects the file type from its extension and delegates to the
appropriate reader. For ``.tsv`` files the delimiter defaults to tab
(``"\\t"``); for other text files it defaults to ``","``.
Parameters
----------
path : str
Path to the data file.
row_count : int, optional
Number of data rows to read (default ``10``).
delimiter : str or None, optional
Column delimiter for text files. ``None`` means *auto-detect*
(tab for ``.tsv``, comma otherwise).
encoding : str, optional
Encoding for text files (default ``"utf-8"``).
quotechar : str, optional
Quote character for text files (default ``'"'``).
Returns
-------
pandas.DataFrame
"""
ext = os.path.splitext(path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file extension '{ext}'. "
f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}"
)
if ext in SAS_EXTENSIONS:
return _read_sas_head(path, row_count=row_count)
# --- Text file path ---
if delimiter is None:
delimiter = "\t" if ext == ".tsv" else ","
return _read_text_head(
path,
row_count=row_count,
delimiter=delimiter,
encoding=encoding,
quotechar=quotechar,
)
# -- Display -----------------------------------------------------------------
def _sample_values(series: pd.Series, n: int = 3) -> str:
"""Return up to *n* non-null sample values as a comma-separated string."""
non_null = series.dropna()
@ -114,26 +243,126 @@ def _print_summary(df: pd.DataFrame) -> None:
print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for the file-viewer CLI."""
parser = argparse.ArgumentParser(
description=(
"Download a SAS or delimited text file from S3 (or read a local "
"file) and print a column-level summary of the first N rows.\n\n"
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
source = parser.add_mutually_exclusive_group()
source.add_argument(
"--local",
metavar="FILE",
default=None,
help=(
"Path to a local data file to summarise (skips S3 download). "
"Supported extensions: "
+ ", ".join(sorted(SUPPORTED_EXTENSIONS))
),
)
source.add_argument(
"--s3-key",
metavar="KEY",
default=None,
help="Override the S3_KEY constant with this object key.",
)
parser.add_argument(
"--rows",
type=int,
default=10,
metavar="N",
help="Number of rows to read (default: 10).",
)
# Text-file-specific options
text_group = parser.add_argument_group(
"text file options",
"These options apply only to .csv / .tsv / .txt files.",
)
text_group.add_argument(
"--delimiter",
default=None,
help=(
'Column delimiter for text files (default: "," for .csv/.txt, '
'"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.'
),
)
text_group.add_argument(
"--encoding",
default="utf-8",
help='File encoding for text files (default: "utf-8").',
)
text_group.add_argument(
"--quotechar",
default='"',
help='Quote character for text files (default: \'"\').',
)
return parser
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# --- Download -----------------------------------------------------------
parser = _build_parser()
args = parser.parse_args()
if args.local:
# ---- Local file mode -----------------------------------------------
local_path = args.local
ext = os.path.splitext(local_path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}'. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
if not os.path.isfile(local_path):
print(f"File not found: {local_path}", file=sys.stderr)
sys.exit(1)
else:
# ---- S3 download mode ----------------------------------------------
s3_key = args.s3_key or S3_KEY
ext = os.path.splitext(s3_key)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
parser.error(
f"Unsupported file extension '{ext}' in S3 key. "
f"Supported: {sorted(SUPPORTED_EXTENSIONS)}"
)
os.makedirs(LOCAL_FOLDER, exist_ok=True)
local_filename = os.path.basename(S3_KEY)
local_filename = os.path.basename(s3_key)
local_path = os.path.join(LOCAL_FOLDER, local_filename)
try:
_ensure_local_copy(S3_BUCKET, S3_KEY, local_path)
_ensure_local_copy(S3_BUCKET, s3_key, local_path)
except Exception as exc:
print(f"S3 download error: {exc}", file=sys.stderr)
sys.exit(1)
# --- Read & summarize ---------------------------------------------------
# ---- Read & summarise --------------------------------------------------
try:
df = _read_sas_head(local_path, row_count=10)
df = _read_head(
local_path,
row_count=args.rows,
delimiter=args.delimiter,
encoding=args.encoding,
quotechar=args.quotechar,
)
except Exception as exc:
print(f"File read error: {exc}", file=sys.stderr)
sys.exit(2)

View File

@ -5,6 +5,10 @@ under that prefix recursively, groups objects into *clusters* using the same
explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each
cluster's files into its own subfolder under a local destination root.
Supported file types:
* SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport``
* Delimited text files: ``.txt``, ``.csv``, ``.tsv``
-------------------------------------------------------------------------------
USAGE
-------------------------------------------------------------------------------
@ -19,8 +23,9 @@ USAGE
aws_profile: default # optional; default boto3 chain if omitted
auto_detect: true # optional; default true
extensions: # optional; default sas7bdat/xpt/xport
extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv
- .sas7bdat
- .csv
on_exists: skip # optional; skip | overwrite | error
concurrency: 4 # optional; default 4
@ -58,7 +63,8 @@ Exit codes:
* Listing is recursive (no S3 ``Delimiter``). Regexes are matched against
the *basename* of each key (the part after the last ``/``), so a nested
object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by
``group_c1.sas7bdat`` alone.
``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled
identically the basename is extracted and matched the same way.
* Explicit patterns are tried in order. A key matched by one pattern is
removed from the pool before the next pattern runs. Overlap between
patterns is flagged as an error at discovery time.
@ -97,7 +103,9 @@ import boto3
import yaml
DEFAULT_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport")
TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv")
DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS
VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error")
DEFAULT_CONCURRENCY: int = 4
@ -318,7 +326,12 @@ def build_s3_client(cfg: DownloadConfig):
def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]:
"""List all objects under ``cfg.prefix`` recursively, filtered by extension."""
"""List all objects under ``cfg.prefix`` recursively, filtered by extension.
Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text
extensions (``.txt``, ``.csv``, ``.tsv``) whichever are present in
``cfg.extensions``.
"""
paginator = s3_client.get_paginator("list_objects_v2")
out: List[S3Object] = []
for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix):
@ -584,8 +597,12 @@ def download_cluster(
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"Download S3 objects under a prefix into a local folder, "
"grouping objects into clusters that each become one subfolder."
"Download S3 objects (SAS data files and/or delimited text files) "
"under a prefix into a local folder, grouping objects into "
"clusters that each become one subfolder. "
"Supported extensions: "
+ ", ".join(DEFAULT_EXTENSIONS)
+ "."
),
)
p.add_argument(

View File

@ -52,11 +52,13 @@ local_folder: ./downloads
auto_detect: true
# Object extensions to consider. Anything else under the prefix is ignored.
# Default (when this key is omitted): .sas7bdat, .xpt, .xport (matches
# generic_loader/load_folder.py).
# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv
# extensions:
# - .sas7bdat
# - .xpt
# - .txt
# - .csv
# - .tsv
# ---------------------------------------------------------------------------
# Optional: download behavior
@ -103,3 +105,7 @@ clusters:
#
# - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$'
# name: year2020_regionA_detail
# Text file cluster example (when file_type: text):
# - pattern: '^data_group_a\d+\.txt$'
# name: data_group_a

1274
utils/sas_profiler.py Normal file

File diff suppressed because it is too large Load Diff