"""Explore S3 directories and categorise them by accessibility. Reads a text file containing one S3 prefix per line (paths within the bucket configured by the ``S3_BUCKET`` constant), then for each prefix: - Lists all objects recursively (via ``list_objects_v2`` paginator) - **Only considers files matching the ``FILE_EXTENSION`` filter** (default ``.sas7bdat``). All other file types are ignored. - Tests read permission with ``head_object`` on the first matching file found - If the first file is accessible, tests ALL remaining files individually - Categorises the directory as **Available**, **Blocked**, **Empty**, and tracks individual file **Exceptions** within available directories A directory is considered *empty* if it contains no files matching the extension filter, even when other file types are present. Configure the constants below, then run:: python3 data_explorer.py Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib. """ from __future__ import annotations import sys from dataclasses import dataclass, field from typing import List, Tuple # --------------------------------------------------------------------------- # Dependency check # --------------------------------------------------------------------------- try: import boto3 # noqa: F401 import botocore.exceptions # noqa: F401 except ImportError: print( "ERROR: boto3 / botocore is not installed.\n" "Install with: pip install boto3", file=sys.stderr, ) sys.exit(1) # --------------------------------------------------------------------------- # Configuration — edit these before running # --------------------------------------------------------------------------- FILE_EXTENSION: str = ".sas7bdat" """Only files whose key ends with this extension (case-insensitive) are considered.""" INPUT_FILE: str = "s3_directories.txt" """Path to the text file containing one S3 prefix per line.""" S3_BUCKET: str = "my-bucket" """S3 bucket name (all prefixes are assumed to live in this bucket).""" AWS_PROFILE: str = "default" """AWS CLI profile name used for authentication.""" # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class AvailableDir: """An S3 directory that is readable.""" prefix: str file_count: int total_size: int # bytes accessible_count: int = 0 # files that passed head_object total_count: int = 0 # total .sas7bdat files found accessible_size: int = 0 # total size of accessible files only @dataclass class BlockedDir: """An S3 directory where access was denied or an error occurred.""" prefix: str file_count: int error: str @dataclass class EmptyDir: """An S3 directory with no objects.""" prefix: str @dataclass class ExceptionFile: """A specific file that failed permission check within an otherwise available directory.""" prefix: str # the directory prefix key: str # the full S3 key of the failed file error: str # the error message @dataclass class Results: """Aggregated exploration results.""" available: List[AvailableDir] = field(default_factory=list) blocked: List[BlockedDir] = field(default_factory=list) empty: List[EmptyDir] = field(default_factory=list) exceptions: List[ExceptionFile] = field(default_factory=list) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def read_input_file(path: str) -> List[str]: """Return a list of S3 prefixes from *path*, ignoring blanks and comments. Each line is stripped and normalised so that non-empty prefixes always end with a trailing ``/``. """ prefixes: List[str] = [] with open(path, encoding="utf-8") as fh: for raw_line in fh: line = raw_line.strip() if not line or line.startswith("#"): continue # Normalise: strip surrounding whitespace/slashes, then re-add # a single trailing slash (unless the prefix is empty/root). line = line.strip("/") if line: line += "/" prefixes.append(line) return prefixes def format_size(size_bytes: int) -> str: """Return a human-readable size string (KB, MB, GB, TB).""" if size_bytes < 1024: return f"{size_bytes} B" for unit in ("KB", "MB", "GB", "TB"): size_bytes /= 1024.0 if size_bytes < 1024.0 or unit == "TB": return f"{size_bytes:,.1f} {unit}" # Fallback (should not be reached) return f"{size_bytes:,.1f} TB" def list_objects( s3_client: "botocore.client.S3", bucket: str, prefix: str, ) -> Tuple[List[Tuple[str, int]], int]: """Recursively list all objects under *prefix*. Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are counted. All other files are silently skipped. Returns ``(files, total_size)`` where *files* is a list of ``(key, size)`` tuples for every matching object and *total_size* is the sum of their sizes in bytes. """ ext_lower = FILE_EXTENSION.lower() paginator = s3_client.get_paginator("list_objects_v2") files: List[Tuple[str, int]] = [] total_size: int = 0 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): if not obj["Key"].lower().endswith(ext_lower): continue files.append((obj["Key"], obj["Size"])) total_size += obj["Size"] return files, total_size def check_read_permission( s3_client: "botocore.client.S3", bucket: str, key: str, ) -> str | None: """Try ``head_object`` on *key*. Return ``None`` on success or an error string.""" try: s3_client.head_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) return f"{message} ({code})" return None # --------------------------------------------------------------------------- # Core logic # --------------------------------------------------------------------------- def explore_directories(prefixes: List[str]) -> Results: """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.""" session = boto3.Session(profile_name=AWS_PROFILE) s3 = session.client("s3") results = Results() total = len(prefixes) for idx, prefix in enumerate(prefixes, start=1): print( f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...", file=sys.stderr, ) # --- Recursive listing ------------------------------------------------ try: files, total_size = list_objects(s3, S3_BUCKET, prefix) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})") ) continue except Exception as exc: results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=str(exc)) ) continue if not files: results.empty.append(EmptyDir(prefix=prefix)) continue file_count = len(files) # --- Permission check on first file ----------------------------------- # Prefer a real object over a zero-byte directory marker (key ending # in "/") for the head_object test. first_key, _ = files[0] test_key = first_key if first_key.endswith("/") and total_size > 0: for key, size in files: if not (key.endswith("/") and size == 0): test_key = key break error = check_read_permission(s3, S3_BUCKET, test_key) if error is not None: # First file blocked → entire directory is blocked results.blocked.append( BlockedDir(prefix=prefix, file_count=file_count, error=error) ) continue # --- First file accessible → check ALL remaining files ---------------- accessible_count = 1 # the first (test_key) already passed accessible_size = 0 dir_exceptions: List[ExceptionFile] = [] # Find the size of the test_key to count it for key, size in files: if key == test_key: accessible_size = size break # Build list of remaining files to check remaining = [(key, size) for key, size in files if key != test_key] if remaining: if len(remaining) > 10: print( f" Verifying access to {file_count} {FILE_EXTENSION} files in {prefix} ...", file=sys.stderr, ) for key, size in remaining: file_error = check_read_permission(s3, S3_BUCKET, key) if file_error is None: accessible_count += 1 accessible_size += size else: dir_exceptions.append( ExceptionFile(prefix=prefix, key=key, error=file_error) ) else: # Only one file and it passed accessible_size = total_size results.available.append( AvailableDir( prefix=prefix, file_count=file_count, total_size=total_size, accessible_count=accessible_count, total_count=file_count, accessible_size=accessible_size, ) ) results.exceptions.extend(dir_exceptions) return results # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def print_results(results: Results) -> None: """Print a clean, human-readable summary to stdout.""" print() print("=== S3 Directory Explorer Results ===") print(f"Bucket: {S3_BUCKET}") # --- Available --- print() print(f"--- Available ({len(results.available)}) ---") if results.available: for d in results.available: print(f" {d.prefix}") print( f" {FILE_EXTENSION} files: {d.accessible_count}/{d.total_count} accessible" f" | Total Size: {format_size(d.accessible_size)}" ) else: print(" (none)") # --- Blocked --- print() print(f"--- Blocked ({len(results.blocked)}) ---") if results.blocked: for d in results.blocked: if d.file_count: print(f" {d.prefix}") print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}") else: print(f" {d.prefix}") print(f" Error: {d.error}") else: print(" (none)") # --- Exceptions --- print() print(f"--- Exceptions ({len(results.exceptions)}) ---") if results.exceptions: for exc in results.exceptions: print(f" {exc.key}") print(f" Directory: {exc.prefix} | Error: {exc.error}") else: print(" (none)") # --- Empty --- print() print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---") if results.empty: for d in results.empty: print(f" {d.prefix}") else: print(" (none)") print() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": import os # --- Read input file ------------------------------------------------------ if not os.path.exists(INPUT_FILE): print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr) sys.exit(1) try: prefixes = read_input_file(INPUT_FILE) except Exception as exc: print(f"ERROR: Could not read input file: {exc}", file=sys.stderr) sys.exit(1) if not prefixes: print("No valid S3 prefixes found in the input file. Nothing to do.") sys.exit(0) # --- Validate AWS profile ------------------------------------------------- try: session = boto3.Session(profile_name=AWS_PROFILE) # Force credential resolution to catch bad profiles early credentials = session.get_credentials() if credentials is None: raise RuntimeError( f"No credentials found for AWS profile {AWS_PROFILE!r}" ) except botocore.exceptions.ProfileNotFound as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1) except Exception as exc: print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr) sys.exit(1) # --- Explore -------------------------------------------------------------- print(f"Bucket: {S3_BUCKET}", file=sys.stderr) results = explore_directories(prefixes) print_results(results)