adding exception counter #7

Merged
mc merged 1 commits from directory_explorer into main 2026-04-22 15:10:15 +00:00
Showing only changes of commit f4b4d0e928 - Show all commits

View File

@ -8,7 +8,9 @@ for each prefix:
- **Only considers files matching the configured extensions** (default: all - **Only considers files matching the configured extensions** (default: all
supported extensions SAS and text). All other file types are ignored. supported extensions SAS and text). All other file types are ignored.
- Tests read permission with ``head_object`` on the first matching file found - Tests read permission with ``head_object`` on the first matching file found
- Categorises the directory as **Available**, **Blocked**, or **Empty** - If the first file is accessible, tests ALL remaining files individually
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
tracks individual file **Exceptions** within available directories
Supported file types Supported file types
-------------------- --------------------
@ -143,6 +145,9 @@ class AvailableDir:
prefix: str prefix: str
file_count: int file_count: int
total_size: int # bytes total_size: int # bytes
accessible_count: int = 0 # files that passed head_object
total_count: int = 0 # total .sas7bdat files found
accessible_size: int = 0 # total size of accessible files only
@dataclass @dataclass
@ -161,6 +166,15 @@ class EmptyDir:
prefix: str prefix: str
@dataclass
class ExceptionFile:
"""A specific file that failed permission check within an otherwise available directory."""
prefix: str # the directory prefix
key: str # the full S3 key of the failed file
error: str # the error message
@dataclass @dataclass
class Results: class Results:
"""Aggregated exploration results.""" """Aggregated exploration results."""
@ -168,6 +182,7 @@ class Results:
available: List[AvailableDir] = field(default_factory=list) available: List[AvailableDir] = field(default_factory=list)
blocked: List[BlockedDir] = field(default_factory=list) blocked: List[BlockedDir] = field(default_factory=list)
empty: List[EmptyDir] = field(default_factory=list) empty: List[EmptyDir] = field(default_factory=list)
exceptions: List[ExceptionFile] = field(default_factory=list)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -218,37 +233,30 @@ def list_objects(
bucket: str, bucket: str,
prefix: str, prefix: str,
extensions: Set[str] | None = None, extensions: Set[str] | None = None,
) -> Tuple[str | None, int, int]: ) -> Tuple[List[Tuple[str, int]], int]:
"""Recursively list all objects under *prefix* using streaming counters. """Recursively list all objects under *prefix*.
Only objects whose key ends with one of *extensions* (case-insensitive) are Only objects whose key ends with one of *extensions* (case-insensitive) are
counted. All other files are silently skipped. When *extensions* is counted. All other files are silently skipped. When *extensions* is
``None`` the module-level ``FILE_EXTENSIONS`` set is used. ``None`` the module-level ``FILE_EXTENSIONS`` set is used.
Returns ``(first_key, file_count, total_size)`` where *first_key* is the Returns ``(files, total_size)`` where *files* is a list of
key of the first matching object found (or ``None`` if no matching files ``(key, size)`` tuples for every matching object and *total_size* is the
exist), *file_count* is the total number of matching objects, and sum of their sizes in bytes.
*total_size* is the sum of their sizes in bytes.
Unlike the previous implementation this never accumulates all keys in
memory, making it safe for prefixes with millions of objects.
""" """
if extensions is None: if extensions is None:
extensions = FILE_EXTENSIONS extensions = FILE_EXTENSIONS
exts_lower = {e.lower() for e in extensions} exts_lower = {e.lower() for e in extensions}
paginator = s3_client.get_paginator("list_objects_v2") paginator = s3_client.get_paginator("list_objects_v2")
first_key: str | None = None files: List[Tuple[str, int]] = []
file_count: int = 0
total_size: int = 0 total_size: int = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []): for obj in page.get("Contents", []):
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower): if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
continue continue
if first_key is None: files.append((obj["Key"], obj["Size"]))
first_key = obj["Key"]
file_count += 1
total_size += obj["Size"] total_size += obj["Size"]
return first_key, file_count, total_size return files, total_size
def check_read_permission( def check_read_permission(
@ -305,7 +313,7 @@ def explore_directories(
# --- Recursive listing ------------------------------------------------ # --- Recursive listing ------------------------------------------------
try: try:
first_key, file_count, total_size = list_objects( files, total_size = list_objects(
s3, S3_BUCKET, prefix, extensions=extensions, s3, S3_BUCKET, prefix, extensions=extensions,
) )
except botocore.exceptions.ClientError as exc: except botocore.exceptions.ClientError as exc:
@ -321,44 +329,77 @@ def explore_directories(
) )
continue continue
if first_key is None: if not files:
results.empty.append(EmptyDir(prefix=prefix)) results.empty.append(EmptyDir(prefix=prefix))
continue continue
# --- Permission check ------------------------------------------------- file_count = len(files)
# Prefer a real object over a zero-byte directory marker (key ending
# in "/") for the head_object test. The selected key must also match # --- Permission check on first file -----------------------------------
# the extension filter. If no suitable key is found, fall back to # in "/") for the head_object test. The listing is already filtered
# first_key. # to the requested extensions, so any non-marker key is a valid probe.
first_key, _ = files[0]
test_key = first_key test_key = first_key
if first_key.endswith("/") and total_size > 0: if first_key.endswith("/") and total_size > 0:
# Re-scan the first page to find a non-marker key matching the extensions for key, size in files:
try: if not (key.endswith("/") and size == 0):
probe_paginator = s3.get_paginator("list_objects_v2") test_key = key
for probe_page in probe_paginator.paginate(
Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000}
):
for obj in probe_page.get("Contents", []):
if (
not (obj["Key"].endswith("/") and obj["Size"] == 0)
and any(obj["Key"].lower().endswith(ext) for ext in exts_lower)
):
test_key = obj["Key"]
break break
if test_key != first_key:
break
except Exception:
pass # Fall back to first_key
error = check_read_permission(s3, S3_BUCKET, test_key) error = check_read_permission(s3, S3_BUCKET, test_key)
if error is None: if error is not None:
results.available.append( # First file blocked → entire directory is blocked
AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size)
)
else:
results.blocked.append( results.blocked.append(
BlockedDir(prefix=prefix, file_count=file_count, error=error) BlockedDir(prefix=prefix, file_count=file_count, error=error)
) )
continue
# --- First file accessible → check ALL remaining files ----------------
accessible_count = 1 # the first (test_key) already passed
accessible_size = 0
dir_exceptions: List[ExceptionFile] = []
# Find the size of the test_key to count it
for key, size in files:
if key == test_key:
accessible_size = size
break
# Build list of remaining files to check
remaining = [(key, size) for key, size in files if key != test_key]
if remaining:
if len(remaining) > 10:
print(
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
file=sys.stderr,
)
for key, size in remaining:
file_error = check_read_permission(s3, S3_BUCKET, key)
if file_error is None:
accessible_count += 1
accessible_size += size
else:
dir_exceptions.append(
ExceptionFile(prefix=prefix, key=key, error=file_error)
)
else:
# Only one file and it passed
accessible_size = total_size
results.available.append(
AvailableDir(
prefix=prefix,
file_count=file_count,
total_size=total_size,
accessible_count=accessible_count,
total_count=file_count,
accessible_size=accessible_size,
)
)
results.exceptions.extend(dir_exceptions)
return results return results
@ -394,7 +435,10 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
if results.available: if results.available:
for d in results.available: for d in results.available:
print(f" {d.prefix}") print(f" {d.prefix}")
print(f" Matching files ({ext_label}): {d.file_count} | Total Size: {format_size(d.total_size)}") print(
f" {ext_label} files: {d.accessible_count}/{d.total_count} accessible"
f" | Total Size: {format_size(d.accessible_size)}"
)
else: else:
print(" (none)") print(" (none)")
@ -412,6 +456,16 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
else: else:
print(" (none)") print(" (none)")
# --- Exceptions ---
print()
print(f"--- Exceptions ({len(results.exceptions)}) ---")
if results.exceptions:
for exc in results.exceptions:
print(f" {exc.key}")
print(f" Directory: {exc.prefix} | Error: {exc.error}")
else:
print(" (none)")
# --- Empty --- # --- Empty ---
print() print()
print(f"--- Empty / no matching files ({len(results.empty)}) ---") print(f"--- Empty / no matching files ({len(results.empty)}) ---")