Compare commits

...

2 Commits

Author SHA1 Message Date
michael-corey
f3bd5f02aa Merge main into directory_explorer: combine text file support with exception tracking 2026-04-22 09:12:16 -05:00
michael-corey
011d8418a6 adding exception counter 2026-04-20 17:02:35 -05:00

View File

@ -8,7 +8,9 @@ for each prefix:
- **Only considers files matching the configured extensions** (default: all
supported extensions SAS and text). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found
- Categorises the directory as **Available**, **Blocked**, or **Empty**
- If the first file is accessible, tests ALL remaining files individually
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
tracks individual file **Exceptions** within available directories
Supported file types
--------------------
@ -143,6 +145,9 @@ class AvailableDir:
prefix: str
file_count: int
total_size: int # bytes
accessible_count: int = 0 # files that passed head_object
total_count: int = 0 # total .sas7bdat files found
accessible_size: int = 0 # total size of accessible files only
@dataclass
@ -161,6 +166,15 @@ class EmptyDir:
prefix: str
@dataclass
class ExceptionFile:
"""A specific file that failed permission check within an otherwise available directory."""
prefix: str # the directory prefix
key: str # the full S3 key of the failed file
error: str # the error message
@dataclass
class Results:
"""Aggregated exploration results."""
@ -168,6 +182,7 @@ class Results:
available: List[AvailableDir] = field(default_factory=list)
blocked: List[BlockedDir] = field(default_factory=list)
empty: List[EmptyDir] = field(default_factory=list)
exceptions: List[ExceptionFile] = field(default_factory=list)
# ---------------------------------------------------------------------------
@ -218,37 +233,30 @@ def list_objects(
bucket: str,
prefix: str,
extensions: Set[str] | None = None,
) -> Tuple[str | None, int, int]:
"""Recursively list all objects under *prefix* using streaming counters.
) -> Tuple[List[Tuple[str, int]], int]:
"""Recursively list all objects under *prefix*.
Only objects whose key ends with one of *extensions* (case-insensitive) are
counted. All other files are silently skipped. When *extensions* is
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
Returns ``(first_key, file_count, total_size)`` where *first_key* is the
key of the first matching object found (or ``None`` if no matching files
exist), *file_count* is the total number of matching objects, and
*total_size* is the sum of their sizes in bytes.
Unlike the previous implementation this never accumulates all keys in
memory, making it safe for prefixes with millions of objects.
Returns ``(files, total_size)`` where *files* is a list of
``(key, size)`` tuples for every matching object and *total_size* is the
sum of their sizes in bytes.
"""
if extensions is None:
extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions}
paginator = s3_client.get_paginator("list_objects_v2")
first_key: str | None = None
file_count: int = 0
files: List[Tuple[str, int]] = []
total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
continue
if first_key is None:
first_key = obj["Key"]
file_count += 1
files.append((obj["Key"], obj["Size"]))
total_size += obj["Size"]
return first_key, file_count, total_size
return files, total_size
def check_read_permission(
@ -305,9 +313,7 @@ def explore_directories(
# --- Recursive listing ------------------------------------------------
try:
first_key, file_count, total_size = list_objects(
s3, S3_BUCKET, prefix, extensions=extensions,
)
files, total_size = list_objects(s3, S3_BUCKET, prefix, extensions=extensions)
except botocore.exceptions.ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "Unknown")
message = exc.response.get("Error", {}).get("Message", str(exc))
@ -321,44 +327,78 @@ def explore_directories(
)
continue
if first_key is None:
if not files:
results.empty.append(EmptyDir(prefix=prefix))
continue
# --- Permission check -------------------------------------------------
file_count = len(files)
# --- Permission check on first file -----------------------------------
# Prefer a real object over a zero-byte directory marker (key ending
# in "/") for the head_object test. The selected key must also match
# the extension filter. If no suitable key is found, fall back to
# first_key.
# the extension filter.
first_key, _ = files[0]
test_key = first_key
if first_key.endswith("/") and total_size > 0:
# Re-scan the first page to find a non-marker key matching the extensions
try:
probe_paginator = s3.get_paginator("list_objects_v2")
for probe_page in probe_paginator.paginate(
Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000}
):
for obj in probe_page.get("Contents", []):
if (
not (obj["Key"].endswith("/") and obj["Size"] == 0)
and any(obj["Key"].lower().endswith(ext) for ext in exts_lower)
):
test_key = obj["Key"]
break
if test_key != first_key:
break
except Exception:
pass # Fall back to first_key
for key, size in files:
if not (key.endswith("/") and size == 0) and matches_extensions(key, exts_lower):
test_key = key
break
error = check_read_permission(s3, S3_BUCKET, test_key)
if error is None:
results.available.append(
AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size)
)
else:
if error is not None:
# First file blocked → entire directory is blocked
results.blocked.append(
BlockedDir(prefix=prefix, file_count=file_count, error=error)
)
continue
# --- First file accessible → check ALL remaining files ----------------
accessible_count = 1 # the first (test_key) already passed
accessible_size = 0
dir_exceptions: List[ExceptionFile] = []
# Find the size of the test_key to count it
for key, size in files:
if key == test_key:
accessible_size = size
break
# Build list of remaining files to check
remaining = [(key, size) for key, size in files if key != test_key]
if remaining:
if len(remaining) > 10:
print(
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
file=sys.stderr,
)
for key, size in remaining:
file_error = check_read_permission(s3, S3_BUCKET, key)
if file_error is None:
accessible_count += 1
accessible_size += size
else:
dir_exceptions.append(
ExceptionFile(prefix=prefix, key=key, error=file_error)
)
else:
# Only one file and it passed
accessible_size = total_size
results.available.append(
AvailableDir(
prefix=prefix,
file_count=file_count,
total_size=total_size,
accessible_count=accessible_count,
total_count=file_count,
accessible_size=accessible_size,
)
)
results.exceptions.extend(dir_exceptions)
return results
@ -394,7 +434,10 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
if results.available:
for d in results.available:
print(f" {d.prefix}")
print(f" Matching files ({ext_label}): {d.file_count} | Total Size: {format_size(d.total_size)}")
print(
f" Matching files ({ext_label}): {d.accessible_count}/{d.total_count} accessible"
f" | Total Size: {format_size(d.accessible_size)}"
)
else:
print(" (none)")
@ -412,6 +455,16 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
else:
print(" (none)")
# --- Exceptions ---
print()
print(f"--- Exceptions ({len(results.exceptions)}) ---")
if results.exceptions:
for exc in results.exceptions:
print(f" {exc.key}")
print(f" Directory: {exc.prefix} | Error: {exc.error}")
else:
print(" (none)")
# --- Empty ---
print()
print(f"--- Empty / no matching files ({len(results.empty)}) ---")