"""Explore S3 directories and categorise them by accessibility. Reads a text file containing one S3 prefix per line (paths within the bucket configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then for each prefix: - Lists all objects recursively (via ``list_objects_v2`` paginator) - **Only considers files matching the configured extensions** (default: all supported extensions — SAS and text). All other file types are ignored. - Tests read permission with ``head_object`` on the first matching file found - If the first file is accessible, tests ALL remaining files individually - Categorises the directory as **Available**, **Blocked**, **Empty**, and tracks individual file **Exceptions** within available directories Supported file types -------------------- * **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport`` * **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv`` A directory is considered *empty* if it contains no files matching the extension filter, even when other file types are present. Configure the constants below (or use CLI arguments), then run:: python3 data_explorer.py [OPTIONS] Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib. """ from __future__ import annotations import argparse import os import sys from dataclasses import dataclass, field from typing import List, Set, Tuple # --------------------------------------------------------------------------- # Dependency check # --------------------------------------------------------------------------- try: import boto3 # noqa: F401 import botocore.exceptions # noqa: F401 except ImportError: print( "ERROR: boto3 / botocore is not installed.\n" "Install with: pip install boto3", file=sys.stderr, ) sys.exit(1) # --------------------------------------------------------------------------- # Extension constants # --------------------------------------------------------------------------- SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"} """File extensions recognised as SAS data files.""" TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"} """File extensions recognised as delimited text / CSV files.""" SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS """Union of all file extensions this tool can work with.""" # --------------------------------------------------------------------------- # Configuration defaults — edit these or override via CLI arguments # --------------------------------------------------------------------------- FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS """Set of extensions to filter on (case-insensitive). Defaults to all supported.""" INPUT_FILE: str = "s3_directories.txt" """Path to the text file containing one S3 prefix per line.""" S3_BUCKET: str = "my-bucket" """S3 bucket name (all prefixes are assumed to live in this bucket).""" AWS_PROFILE: str = "default" """AWS CLI profile name used for authentication.""" # Text-file reading defaults (used when downloading / previewing text files) DEFAULT_DELIMITER: str = "," DEFAULT_ENCODING: str = "utf-8" DEFAULT_QUOTECHAR: str = '"' # --------------------------------------------------------------------------- # Auto-detection helpers # --------------------------------------------------------------------------- def detect_file_type(filename: str) -> str: """Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension. The check is case-insensitive. For ``.tsv`` files the caller should default the delimiter to a tab character (``'\\t'``). Examples -------- >>> detect_file_type("data.sas7bdat") 'sas' >>> detect_file_type("report.CSV") 'text' >>> detect_file_type("archive.zip") 'unknown' """ ext = os.path.splitext(filename)[1].lower() if ext in SAS_EXTENSIONS: return "sas" if ext in TEXT_EXTENSIONS: return "text" return "unknown" def default_delimiter_for(filename: str) -> str: """Return a sensible default delimiter for *filename*. * ``.tsv`` → ``'\\t'`` * everything else → ``','`` """ ext = os.path.splitext(filename)[1].lower() if ext == ".tsv": return "\t" return "," def matches_extensions(key: str, extensions: Set[str]) -> bool: """Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive).""" key_lower = key.lower() return any(key_lower.endswith(ext) for ext in extensions) # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class AvailableDir: """An S3 directory that is readable.""" prefix: str file_count: int total_size: int # bytes accessible_count: int = 0 # files that passed head_object total_count: int = 0 # total .sas7bdat files found accessible_size: int = 0 # total size of accessible files only @dataclass class BlockedDir: """An S3 directory where access was denied or an error occurred.""" prefix: str file_count: int error: str @dataclass class EmptyDir: """An S3 directory with no objects.""" prefix: str @dataclass class ExceptionFile: """A specific file that failed permission check within an otherwise available directory.""" prefix: str # the directory prefix key: str # the full S3 key of the failed file error: str # the error message @dataclass class Results: """Aggregated exploration results.""" available: List[AvailableDir] = field(default_factory=list) blocked: List[BlockedDir] = field(default_factory=list) empty: List[EmptyDir] = field(default_factory=list) exceptions: List[ExceptionFile] = field(default_factory=list) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def read_input_file(path: str) -> List[str]: """Return a list of S3 prefixes from *path*, ignoring blanks and comments. Each line is stripped and normalised so that non-empty prefixes always end with a trailing ``/``. """ prefixes: List[str] = [] with open(path, encoding="utf-8") as fh: for raw_line in fh: line = raw_line.strip() if not line or line.startswith("#"): continue # Normalise: strip surrounding whitespace/slashes, then re-add # a single trailing slash (unless the prefix is empty/root). line = line.strip("/") if line: line += "/" prefixes.append(line) return prefixes def format_size(size_bytes: int) -> str: """Return a human-readable size string (KB, MB, GB, TB).""" if size_bytes < 1024: return f"{size_bytes} B" for unit in ("KB", "MB", "GB", "TB"): size_bytes /= 1024.0 if size_bytes < 1024.0 or unit == "TB": return f"{size_bytes:,.1f} {unit}" # Fallback (should not be reached) return f"{size_bytes:,.1f} TB" def extensions_label(extensions: Set[str]) -> str: """Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``).""" return "/".join(sorted(extensions)) def list_objects( s3_client: "botocore.client.S3", bucket: str, prefix: str, extensions: Set[str] | None = None, ) -> Tuple[List[Tuple[str, int]], int]: """Recursively list all objects under *prefix*. Only objects whose key ends with one of *extensions* (case-insensitive) are counted. All other files are silently skipped. When *extensions* is ``None`` the module-level ``FILE_EXTENSIONS`` set is used. Returns ``(files, total_size)`` where *files* is a list of ``(key, size)`` tuples for every matching object and *total_size* is the sum of their sizes in bytes. """ if extensions is None: extensions = FILE_EXTENSIONS exts_lower = {e.lower() for e in extensions} paginator = s3_client.get_paginator("list_objects_v2") files: List[Tuple[str, int]] = [] total_size: int = 0 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower): continue files.append((obj["Key"], obj["Size"])) total_size += obj["Size"] return files, total_size def check_read_permission( s3_client: "botocore.client.S3", bucket: str, key: str, ) -> str | None: """Try ``head_object`` on *key*. Return ``None`` on success or an error string.""" try: s3_client.head_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) return f"{message} ({code})" return None # --------------------------------------------------------------------------- # Core logic # --------------------------------------------------------------------------- def explore_directories( prefixes: List[str], *, extensions: Set[str] | None = None, ) -> Results: """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*. Parameters ---------- prefixes: List of S3 key prefixes to explore. extensions: Set of file extensions to filter on. Defaults to the module-level ``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``). """ if extensions is None: extensions = FILE_EXTENSIONS exts_lower = {e.lower() for e in extensions} ext_label = extensions_label(extensions) session = boto3.Session(profile_name=AWS_PROFILE) s3 = session.client("s3") results = Results() total = len(prefixes) for idx, prefix in enumerate(prefixes, start=1): print( f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...", file=sys.stderr, ) # --- Recursive listing ------------------------------------------------ try: files, total_size = list_objects( s3, S3_BUCKET, prefix, extensions=extensions, ) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=f"{message} ({code})") ) continue except Exception as exc: results.blocked.append( BlockedDir(prefix=prefix, file_count=0, error=str(exc)) ) continue if not files: results.empty.append(EmptyDir(prefix=prefix)) continue file_count = len(files) # --- Permission check on first file ----------------------------------- # in "/") for the head_object test. The listing is already filtered # to the requested extensions, so any non-marker key is a valid probe. first_key, _ = files[0] test_key = first_key if first_key.endswith("/") and total_size > 0: for key, size in files: if not (key.endswith("/") and size == 0): test_key = key break error = check_read_permission(s3, S3_BUCKET, test_key) if error is not None: # First file blocked → entire directory is blocked results.blocked.append( BlockedDir(prefix=prefix, file_count=file_count, error=error) ) continue # --- First file accessible → check ALL remaining files ---------------- accessible_count = 1 # the first (test_key) already passed accessible_size = 0 dir_exceptions: List[ExceptionFile] = [] # Find the size of the test_key to count it for key, size in files: if key == test_key: accessible_size = size break # Build list of remaining files to check remaining = [(key, size) for key, size in files if key != test_key] if remaining: if len(remaining) > 10: print( f" Verifying access to {file_count} {ext_label} files in {prefix} ...", file=sys.stderr, ) for key, size in remaining: file_error = check_read_permission(s3, S3_BUCKET, key) if file_error is None: accessible_count += 1 accessible_size += size else: dir_exceptions.append( ExceptionFile(prefix=prefix, key=key, error=file_error) ) else: # Only one file and it passed accessible_size = total_size results.available.append( AvailableDir( prefix=prefix, file_count=file_count, total_size=total_size, accessible_count=accessible_count, total_count=file_count, accessible_size=accessible_size, ) ) results.exceptions.extend(dir_exceptions) return results # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def print_results(results: Results, *, extensions: Set[str] | None = None) -> None: """Print a clean, human-readable summary to stdout. Parameters ---------- results: The exploration results to display. extensions: The set of extensions that were used for filtering. Used only for labelling in the output. Defaults to ``FILE_EXTENSIONS``. """ if extensions is None: extensions = FILE_EXTENSIONS ext_label = extensions_label(extensions) print() print("=== S3 Directory Explorer Results ===") print(f"Bucket: {S3_BUCKET}") print(f"Extensions: {ext_label}") # --- Available --- print() print(f"--- Available ({len(results.available)}) ---") if results.available: for d in results.available: print(f" {d.prefix}") print( f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible" f" | Total Size: {format_size(d.accessible_size)}" ) else: print(" (none)") # --- Blocked --- print() print(f"--- Blocked ({len(results.blocked)}) ---") if results.blocked: for d in results.blocked: if d.file_count: print(f" {d.prefix}") print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}") else: print(f" {d.prefix}") print(f" Error: {d.error}") else: print(" (none)") # --- Exceptions --- print() print(f"--- Exceptions ({len(results.exceptions)}) ---") if results.exceptions: for exc in results.exceptions: print(f" {exc.key}") print(f" Directory: {exc.prefix} | Error: {exc.error}") else: print(" (none)") # --- Empty --- print() print(f"--- Empty / no matching files ({len(results.empty)}) ---") if results.empty: for d in results.empty: print(f" {d.prefix}") else: print(" (none)") print() # --------------------------------------------------------------------------- # CLI argument parsing # --------------------------------------------------------------------------- def build_arg_parser() -> argparse.ArgumentParser: """Build and return the CLI argument parser. Supports selecting file-type filters, text-file reading parameters, and overriding the default bucket / profile / input-file settings. """ parser = argparse.ArgumentParser( description=( "Explore S3 directories and categorise them by accessibility. " "Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text " "files (.txt, .csv, .tsv)." ), ) # --- File-type / extension selection --- type_group = parser.add_argument_group("File-type selection") type_group.add_argument( "--file-type", choices=["sas", "text", "all"], default="all", help=( "Restrict the scan to a specific file type. " "'sas' = .sas7bdat/.xpt/.xport only; " "'text' = .txt/.csv/.tsv only; " "'all' = both (default)." ), ) type_group.add_argument( "--extensions", nargs="+", metavar="EXT", help=( "Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). " "Overrides --file-type when provided." ), ) # --- Text-file reading parameters --- text_group = parser.add_argument_group( "Text-file parameters", description=( "Parameters used when reading delimited text files. These are " "stored for downstream consumers and do not affect the S3 scan " "itself." ), ) text_group.add_argument( "--delimiter", default=None, help=( "Field delimiter for text files (default: ',' for .csv/.txt, " "'\\t' for .tsv). Use 'tab' or '\\t' for a tab character." ), ) text_group.add_argument( "--encoding", default=DEFAULT_ENCODING, help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).", ) text_group.add_argument( "--quotechar", default=DEFAULT_QUOTECHAR, help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).", ) # --- S3 / general settings --- s3_group = parser.add_argument_group("S3 settings") s3_group.add_argument( "--bucket", default=None, help=f"S3 bucket name (default: {S3_BUCKET}).", ) s3_group.add_argument( "--profile", default=None, help=f"AWS CLI profile name (default: {AWS_PROFILE}).", ) s3_group.add_argument( "--input-file", default=None, help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).", ) return parser def resolve_extensions(args: argparse.Namespace) -> Set[str]: """Determine the active extension set from parsed CLI *args*. If ``--extensions`` is provided it takes precedence. Otherwise ``--file-type`` is used to select a predefined set. """ if args.extensions: # Normalise: ensure each extension starts with a dot and is lowercase exts: Set[str] = set() for ext in args.extensions: ext = ext.strip().lower() if not ext.startswith("."): ext = "." + ext exts.add(ext) return exts if args.file_type == "sas": return SAS_EXTENSIONS if args.file_type == "text": return TEXT_EXTENSIONS return SUPPORTED_EXTENSIONS def resolve_delimiter(args: argparse.Namespace) -> str: """Return the effective delimiter from parsed CLI *args*. Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a tab character on the command line without shell-escaping issues. """ if args.delimiter is None: return DEFAULT_DELIMITER raw = args.delimiter if raw.lower() in ("tab", "\\t"): return "\t" return raw # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": parser = build_arg_parser() args = parser.parse_args() # --- Apply CLI overrides to module-level config --------------------------- if args.bucket: S3_BUCKET = args.bucket if args.profile: AWS_PROFILE = args.profile input_file = args.input_file if args.input_file else INPUT_FILE active_extensions = resolve_extensions(args) FILE_EXTENSIONS = active_extensions delimiter = resolve_delimiter(args) encoding = args.encoding quotechar = args.quotechar # --- Read input file ------------------------------------------------------ if not os.path.exists(input_file): print(f"ERROR: Input file not found: {input_file}", file=sys.stderr) sys.exit(1) try: prefixes = read_input_file(input_file) except Exception as exc: print(f"ERROR: Could not read input file: {exc}", file=sys.stderr) sys.exit(1) if not prefixes: print("No valid S3 prefixes found in the input file. Nothing to do.") sys.exit(0) # --- Validate AWS profile ------------------------------------------------- try: session = boto3.Session(profile_name=AWS_PROFILE) # Force credential resolution to catch bad profiles early credentials = session.get_credentials() if credentials is None: raise RuntimeError( f"No credentials found for AWS profile {AWS_PROFILE!r}" ) except botocore.exceptions.ProfileNotFound as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1) except Exception as exc: print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr) sys.exit(1) # --- Print active configuration ------------------------------------------- ext_label = extensions_label(active_extensions) print(f"Bucket: {S3_BUCKET}", file=sys.stderr) print(f"Extensions: {ext_label}", file=sys.stderr) if active_extensions & TEXT_EXTENSIONS: print( f"Text opts: delimiter={delimiter!r} encoding={encoding!r} " f"quotechar={quotechar!r}", file=sys.stderr, ) # --- Explore -------------------------------------------------------------- results = explore_directories(prefixes, extensions=active_extensions) print_results(results, extensions=active_extensions)