"""Explore S3 directories and categorise them by accessibility. Reads a text file containing one S3 prefix per line (paths within the bucket configured by the ``S3_BUCKET`` constant), then for each prefix: - Lists all objects recursively (via ``list_objects_v2`` paginator) - **Only considers files matching the ``FILE_EXTENSION`` filter** (default ``.sas7bdat``). All other file types are ignored. - Tests read permission with ``head_object`` on the first matching file found - Categorises the directory as **Available**, **Blocked**, or **Empty** A directory is considered *empty* if it contains no files matching the extension filter, even when other file types are present. Configure the constants below, then run:: python3 data_explorer.py Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib. """ from __future__ import annotations import sys from dataclasses import dataclass, field from typing import List, Tuple # --------------------------------------------------------------------------- # Dependency check # --------------------------------------------------------------------------- try: import boto3 # noqa: F401 import botocore.exceptions # noqa: F401 except ImportError: print( "ERROR: boto3 / botocore is not installed.\n" "Install with: pip install boto3", file=sys.stderr, ) sys.exit(1) # --------------------------------------------------------------------------- # Configuration — edit these before running # --------------------------------------------------------------------------- FILE_EXTENSION: str = ".sas7bdat" """Only files whose key ends with this extension (case-insensitive) are considered.""" INPUT_FILE: str = "s3_directories.txt" """Path to the text file containing one S3 prefix per line.""" S3_BUCKET: str = "my-bucket" """S3 bucket name (all prefixes are assumed to live in this bucket).""" AWS_PROFILE: str = "default" """AWS CLI profile name used for authentication.""" # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class AvailableDir: """An S3 directory that is readable.""" prefix: str file_count: int total_size: int # bytes @dataclass class BlockedDir: """An S3 directory where access was denied or an error occurred.""" prefix: str file_count: int error: str @dataclass class EmptyDir: """An S3 directory with no objects.""" prefix: str @dataclass class Results: """Aggregated exploration results.""" available: List[AvailableDir] = field(default_factory=list) blocked: List[BlockedDir] = field(default_factory=list) empty: List[EmptyDir] = field(default_factory=list) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def read_input_file(path: str) -> List[str]: """Return a list of S3 prefixes from *path*, ignoring blanks and comments. Each line is stripped and normalised so that non-empty prefixes always end with a trailing ``/``. """ prefixes: List[str] = [] with open(path, encoding="utf-8") as fh: for raw_line in fh: line = raw_line.strip() if not line or line.startswith("#"): continue # Normalise: strip surrounding whitespace/slashes, then re-add # a single trailing slash (unless the prefix is empty/root). line = line.strip("/") if line: line += "/" prefixes.append(line) return prefixes def format_size(size_bytes: int) -> str: """Return a human-readable size string (KB, MB, GB, TB).""" if size_bytes < 1024: return f"{size_bytes} B" for unit in ("KB", "MB", "GB", "TB"): size_bytes /= 1024.0 if size_bytes < 1024.0 or unit == "TB": return f"{size_bytes:,.1f} {unit}" # Fallback (should not be reached) return f"{size_bytes:,.1f} TB" def list_objects( s3_client: "botocore.client.S3", bucket: str, prefix: str, ) -> Tuple[str | None, int, int]: """Recursively list all objects under *prefix* using streaming counters. Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are counted. All other files are silently skipped. Returns ``(first_key, file_count, total_size)`` where *first_key* is the key of the first matching object found (or ``None`` if no matching files exist), *file_count* is the total number of matching objects, and *total_size* is the sum of their sizes in bytes. Unlike the previous implementation this never accumulates all keys in memory, making it safe for prefixes with millions of objects. """ ext_lower = FILE_EXTENSION.lower() paginator = s3_client.get_paginator("list_objects_v2") first_key: str | None = None file_count: int = 0 total_size: int = 0 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): if not obj["Key"].lower().endswith(ext_lower): continue if first_key is None: first_key = obj["Key"] file_count += 1 total_size += obj["Size"] return first_key, file_count, total_size def check_read_permission( s3_client: "botocore.client.S3", bucket: str, key: str, ) -> str | None: """Try ``head_object`` on *key*. Return ``None`` on success or an error string.""" try: s3_client.head_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) return f"{message} ({code})" return None # --------------------------------------------------------------------------- # Core logic # --------------------------------------------------------------------------- def explore_directories(prefixes: List[str]) -> Results: """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.""" session = boto3.Session(profile_name=AWS_PROFILE) s3 = session.client("s3") results = Results() total = len(prefixes) for idx, prefix in enumerate(prefixes, start=1): print( f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...", file=sys.stderr, ) # --- Recursive listing ------------------------------------------------ try: first_key, file_count, total_size = list_objects(s3, S3_BUCKET, prefix) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})") ) continue except Exception as exc: results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=str(exc)) ) continue if first_key is None: results.empty.append(EmptyDir(prefix=prefix)) continue # --- Permission check ------------------------------------------------- # Prefer a real object over a zero-byte directory marker (key ending # in "/") for the head_object test. The selected key must also match # the FILE_EXTENSION filter. If no suitable key is found, fall back # to first_key. ext_lower = FILE_EXTENSION.lower() test_key = first_key if first_key.endswith("/") and total_size > 0: # Re-scan the first page to find a non-marker key matching the extension try: probe_paginator = s3.get_paginator("list_objects_v2") for probe_page in probe_paginator.paginate( Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000} ): for obj in probe_page.get("Contents", []): if ( not (obj["Key"].endswith("/") and obj["Size"] == 0) and obj["Key"].lower().endswith(ext_lower) ): test_key = obj["Key"] break if test_key != first_key: break except Exception: pass # Fall back to first_key error = check_read_permission(s3, S3_BUCKET, test_key) if error is None: results.available.append( AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size) ) else: results.blocked.append( BlockedDir(prefix=prefix, file_count=file_count, error=error) ) return results # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def print_results(results: Results) -> None: """Print a clean, human-readable summary to stdout.""" print() print("=== S3 Directory Explorer Results ===") print(f"Bucket: {S3_BUCKET}") # --- Available --- print() print(f"--- Available ({len(results.available)}) ---") if results.available: for d in results.available: print(f" {d.prefix}") print(f" {FILE_EXTENSION} files: {d.file_count} | Total Size: {format_size(d.total_size)}") else: print(" (none)") # --- Blocked --- print() print(f"--- Blocked ({len(results.blocked)}) ---") if results.blocked: for d in results.blocked: if d.file_count: print(f" {d.prefix}") print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}") else: print(f" {d.prefix}") print(f" Error: {d.error}") else: print(" (none)") # --- Empty --- print() print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---") if results.empty: for d in results.empty: print(f" {d.prefix}") else: print(" (none)") print() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": import os # --- Read input file ------------------------------------------------------ if not os.path.exists(INPUT_FILE): print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr) sys.exit(1) try: prefixes = read_input_file(INPUT_FILE) except Exception as exc: print(f"ERROR: Could not read input file: {exc}", file=sys.stderr) sys.exit(1) if not prefixes: print("No valid S3 prefixes found in the input file. Nothing to do.") sys.exit(0) # --- Validate AWS profile ------------------------------------------------- try: session = boto3.Session(profile_name=AWS_PROFILE) # Force credential resolution to catch bad profiles early credentials = session.get_credentials() if credentials is None: raise RuntimeError( f"No credentials found for AWS profile {AWS_PROFILE!r}" ) except botocore.exceptions.ProfileNotFound as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1) except Exception as exc: print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr) sys.exit(1) # --- Explore -------------------------------------------------------------- print(f"Bucket: {S3_BUCKET}", file=sys.stderr) results = explore_directories(prefixes) print_results(results)