updating for sas

This commit is contained in:
michael-corey 2026-04-20 16:30:35 -05:00
parent 2390ce1e0c
commit e48038f3c6

View File

@ -3,10 +3,15 @@
Reads a text file containing one S3 prefix per line (paths within the bucket Reads a text file containing one S3 prefix per line (paths within the bucket
configured by the ``S3_BUCKET`` constant), then for each prefix: configured by the ``S3_BUCKET`` constant), then for each prefix:
- Lists all objects recursively (via ``list_objects_v2`` paginator) - Lists all objects recursively (via ``list_objects_v2`` paginator)
- Tests read permission with ``head_object`` on the first file found - **Only considers files matching the ``FILE_EXTENSION`` filter** (default
``.sas7bdat``). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found
- Categorises the directory as **Available**, **Blocked**, or **Empty** - Categorises the directory as **Available**, **Blocked**, or **Empty**
Configure the three constants below, then run:: A directory is considered *empty* if it contains no files matching the
extension filter, even when other file types are present.
Configure the constants below, then run::
python3 data_explorer.py python3 data_explorer.py
@ -39,6 +44,9 @@ except ImportError:
# Configuration — edit these before running # Configuration — edit these before running
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
FILE_EXTENSION: str = ".sas7bdat"
"""Only files whose key ends with this extension (case-insensitive) are considered."""
INPUT_FILE: str = "s3_directories.txt" INPUT_FILE: str = "s3_directories.txt"
"""Path to the text file containing one S3 prefix per line.""" """Path to the text file containing one S3 prefix per line."""
@ -133,20 +141,26 @@ def list_objects(
) -> Tuple[str | None, int, int]: ) -> Tuple[str | None, int, int]:
"""Recursively list all objects under *prefix* using streaming counters. """Recursively list all objects under *prefix* using streaming counters.
Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are
counted. All other files are silently skipped.
Returns ``(first_key, file_count, total_size)`` where *first_key* is the Returns ``(first_key, file_count, total_size)`` where *first_key* is the
key of the first object found (or ``None`` if the prefix is empty), key of the first matching object found (or ``None`` if no matching files
*file_count* is the total number of objects, and *total_size* is the sum exist), *file_count* is the total number of matching objects, and
of all object sizes in bytes. *total_size* is the sum of their sizes in bytes.
Unlike the previous implementation this never accumulates all keys in Unlike the previous implementation this never accumulates all keys in
memory, making it safe for prefixes with millions of objects. memory, making it safe for prefixes with millions of objects.
""" """
ext_lower = FILE_EXTENSION.lower()
paginator = s3_client.get_paginator("list_objects_v2") paginator = s3_client.get_paginator("list_objects_v2")
first_key: str | None = None first_key: str | None = None
file_count: int = 0 file_count: int = 0
total_size: int = 0 total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []): for obj in page.get("Contents", []):
if not obj["Key"].lower().endswith(ext_lower):
continue
if first_key is None: if first_key is None:
first_key = obj["Key"] first_key = obj["Key"]
file_count += 1 file_count += 1
@ -183,7 +197,10 @@ def explore_directories(prefixes: List[str]) -> Results:
total = len(prefixes) total = len(prefixes)
for idx, prefix in enumerate(prefixes, start=1): for idx, prefix in enumerate(prefixes, start=1):
print(f"[{idx}/{total}] Checking {prefix} ...", file=sys.stderr) print(
f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...",
file=sys.stderr,
)
# --- Recursive listing ------------------------------------------------ # --- Recursive listing ------------------------------------------------
try: try:
@ -207,18 +224,23 @@ def explore_directories(prefixes: List[str]) -> Results:
# --- Permission check ------------------------------------------------- # --- Permission check -------------------------------------------------
# Prefer a real object over a zero-byte directory marker (key ending # Prefer a real object over a zero-byte directory marker (key ending
# in "/") for the head_object test. If every key is a directory # in "/") for the head_object test. The selected key must also match
# marker, fall back to the first one anyway. # the FILE_EXTENSION filter. If no suitable key is found, fall back
# to first_key.
ext_lower = FILE_EXTENSION.lower()
test_key = first_key test_key = first_key
if first_key.endswith("/") and total_size > 0: if first_key.endswith("/") and total_size > 0:
# Re-scan the first page to find a non-marker key # Re-scan the first page to find a non-marker key matching the extension
try: try:
probe_paginator = s3.get_paginator("list_objects_v2") probe_paginator = s3.get_paginator("list_objects_v2")
for probe_page in probe_paginator.paginate( for probe_page in probe_paginator.paginate(
Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000} Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000}
): ):
for obj in probe_page.get("Contents", []): for obj in probe_page.get("Contents", []):
if not (obj["Key"].endswith("/") and obj["Size"] == 0): if (
not (obj["Key"].endswith("/") and obj["Size"] == 0)
and obj["Key"].lower().endswith(ext_lower)
):
test_key = obj["Key"] test_key = obj["Key"]
break break
if test_key != first_key: if test_key != first_key:
@ -256,7 +278,7 @@ def print_results(results: Results) -> None:
if results.available: if results.available:
for d in results.available: for d in results.available:
print(f" {d.prefix}") print(f" {d.prefix}")
print(f" Files: {d.file_count} | Total Size: {format_size(d.total_size)}") print(f" {FILE_EXTENSION} files: {d.file_count} | Total Size: {format_size(d.total_size)}")
else: else:
print(" (none)") print(" (none)")
@ -267,7 +289,7 @@ def print_results(results: Results) -> None:
for d in results.blocked: for d in results.blocked:
if d.file_count: if d.file_count:
print(f" {d.prefix}") print(f" {d.prefix}")
print(f" Files found: {d.file_count} | Error: {d.error}") print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}")
else: else:
print(f" {d.prefix}") print(f" {d.prefix}")
print(f" Error: {d.error}") print(f" Error: {d.error}")
@ -276,7 +298,7 @@ def print_results(results: Results) -> None:
# --- Empty --- # --- Empty ---
print() print()
print(f"--- Empty ({len(results.empty)}) ---") print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---")
if results.empty: if results.empty:
for d in results.empty: for d in results.empty:
print(f" {d.prefix}") print(f" {d.prefix}")