Compare commits
2 Commits
f4b4d0e928
...
f3bd5f02aa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3bd5f02aa | ||
|
|
011d8418a6 |
@ -8,7 +8,9 @@ for each prefix:
|
||||
- **Only considers files matching the configured extensions** (default: all
|
||||
supported extensions — SAS and text). All other file types are ignored.
|
||||
- Tests read permission with ``head_object`` on the first matching file found
|
||||
- Categorises the directory as **Available**, **Blocked**, or **Empty**
|
||||
- If the first file is accessible, tests ALL remaining files individually
|
||||
- Categorises the directory as **Available**, **Blocked**, **Empty**, and
|
||||
tracks individual file **Exceptions** within available directories
|
||||
|
||||
Supported file types
|
||||
--------------------
|
||||
@ -143,6 +145,9 @@ class AvailableDir:
|
||||
prefix: str
|
||||
file_count: int
|
||||
total_size: int # bytes
|
||||
accessible_count: int = 0 # files that passed head_object
|
||||
total_count: int = 0 # total .sas7bdat files found
|
||||
accessible_size: int = 0 # total size of accessible files only
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -161,6 +166,15 @@ class EmptyDir:
|
||||
prefix: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExceptionFile:
|
||||
"""A specific file that failed permission check within an otherwise available directory."""
|
||||
|
||||
prefix: str # the directory prefix
|
||||
key: str # the full S3 key of the failed file
|
||||
error: str # the error message
|
||||
|
||||
|
||||
@dataclass
|
||||
class Results:
|
||||
"""Aggregated exploration results."""
|
||||
@ -168,6 +182,7 @@ class Results:
|
||||
available: List[AvailableDir] = field(default_factory=list)
|
||||
blocked: List[BlockedDir] = field(default_factory=list)
|
||||
empty: List[EmptyDir] = field(default_factory=list)
|
||||
exceptions: List[ExceptionFile] = field(default_factory=list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -218,37 +233,30 @@ def list_objects(
|
||||
bucket: str,
|
||||
prefix: str,
|
||||
extensions: Set[str] | None = None,
|
||||
) -> Tuple[str | None, int, int]:
|
||||
"""Recursively list all objects under *prefix* using streaming counters.
|
||||
) -> Tuple[List[Tuple[str, int]], int]:
|
||||
"""Recursively list all objects under *prefix*.
|
||||
|
||||
Only objects whose key ends with one of *extensions* (case-insensitive) are
|
||||
counted. All other files are silently skipped. When *extensions* is
|
||||
``None`` the module-level ``FILE_EXTENSIONS`` set is used.
|
||||
|
||||
Returns ``(first_key, file_count, total_size)`` where *first_key* is the
|
||||
key of the first matching object found (or ``None`` if no matching files
|
||||
exist), *file_count* is the total number of matching objects, and
|
||||
*total_size* is the sum of their sizes in bytes.
|
||||
|
||||
Unlike the previous implementation this never accumulates all keys in
|
||||
memory, making it safe for prefixes with millions of objects.
|
||||
Returns ``(files, total_size)`` where *files* is a list of
|
||||
``(key, size)`` tuples for every matching object and *total_size* is the
|
||||
sum of their sizes in bytes.
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = FILE_EXTENSIONS
|
||||
exts_lower = {e.lower() for e in extensions}
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
first_key: str | None = None
|
||||
file_count: int = 0
|
||||
files: List[Tuple[str, int]] = []
|
||||
total_size: int = 0
|
||||
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower):
|
||||
continue
|
||||
if first_key is None:
|
||||
first_key = obj["Key"]
|
||||
file_count += 1
|
||||
files.append((obj["Key"], obj["Size"]))
|
||||
total_size += obj["Size"]
|
||||
return first_key, file_count, total_size
|
||||
return files, total_size
|
||||
|
||||
|
||||
def check_read_permission(
|
||||
@ -305,9 +313,7 @@ def explore_directories(
|
||||
|
||||
# --- Recursive listing ------------------------------------------------
|
||||
try:
|
||||
first_key, file_count, total_size = list_objects(
|
||||
s3, S3_BUCKET, prefix, extensions=extensions,
|
||||
)
|
||||
files, total_size = list_objects(s3, S3_BUCKET, prefix, extensions=extensions)
|
||||
except botocore.exceptions.ClientError as exc:
|
||||
code = exc.response.get("Error", {}).get("Code", "Unknown")
|
||||
message = exc.response.get("Error", {}).get("Message", str(exc))
|
||||
@ -321,44 +327,78 @@ def explore_directories(
|
||||
)
|
||||
continue
|
||||
|
||||
if first_key is None:
|
||||
if not files:
|
||||
results.empty.append(EmptyDir(prefix=prefix))
|
||||
continue
|
||||
|
||||
# --- Permission check -------------------------------------------------
|
||||
file_count = len(files)
|
||||
|
||||
# --- Permission check on first file -----------------------------------
|
||||
# Prefer a real object over a zero-byte directory marker (key ending
|
||||
# in "/") for the head_object test. The selected key must also match
|
||||
# the extension filter. If no suitable key is found, fall back to
|
||||
# first_key.
|
||||
# the extension filter.
|
||||
first_key, _ = files[0]
|
||||
test_key = first_key
|
||||
if first_key.endswith("/") and total_size > 0:
|
||||
# Re-scan the first page to find a non-marker key matching the extensions
|
||||
try:
|
||||
probe_paginator = s3.get_paginator("list_objects_v2")
|
||||
for probe_page in probe_paginator.paginate(
|
||||
Bucket=S3_BUCKET, Prefix=prefix, PaginationConfig={"MaxItems": 1000}
|
||||
):
|
||||
for obj in probe_page.get("Contents", []):
|
||||
if (
|
||||
not (obj["Key"].endswith("/") and obj["Size"] == 0)
|
||||
and any(obj["Key"].lower().endswith(ext) for ext in exts_lower)
|
||||
):
|
||||
test_key = obj["Key"]
|
||||
for key, size in files:
|
||||
if not (key.endswith("/") and size == 0) and matches_extensions(key, exts_lower):
|
||||
test_key = key
|
||||
break
|
||||
if test_key != first_key:
|
||||
break
|
||||
except Exception:
|
||||
pass # Fall back to first_key
|
||||
|
||||
error = check_read_permission(s3, S3_BUCKET, test_key)
|
||||
if error is None:
|
||||
results.available.append(
|
||||
AvailableDir(prefix=prefix, file_count=file_count, total_size=total_size)
|
||||
)
|
||||
else:
|
||||
if error is not None:
|
||||
# First file blocked → entire directory is blocked
|
||||
results.blocked.append(
|
||||
BlockedDir(prefix=prefix, file_count=file_count, error=error)
|
||||
)
|
||||
continue
|
||||
|
||||
# --- First file accessible → check ALL remaining files ----------------
|
||||
accessible_count = 1 # the first (test_key) already passed
|
||||
accessible_size = 0
|
||||
dir_exceptions: List[ExceptionFile] = []
|
||||
|
||||
# Find the size of the test_key to count it
|
||||
for key, size in files:
|
||||
if key == test_key:
|
||||
accessible_size = size
|
||||
break
|
||||
|
||||
# Build list of remaining files to check
|
||||
remaining = [(key, size) for key, size in files if key != test_key]
|
||||
|
||||
if remaining:
|
||||
if len(remaining) > 10:
|
||||
print(
|
||||
f" Verifying access to {file_count} {ext_label} files in {prefix} ...",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
for key, size in remaining:
|
||||
file_error = check_read_permission(s3, S3_BUCKET, key)
|
||||
if file_error is None:
|
||||
accessible_count += 1
|
||||
accessible_size += size
|
||||
else:
|
||||
dir_exceptions.append(
|
||||
ExceptionFile(prefix=prefix, key=key, error=file_error)
|
||||
)
|
||||
|
||||
else:
|
||||
# Only one file and it passed
|
||||
accessible_size = total_size
|
||||
|
||||
results.available.append(
|
||||
AvailableDir(
|
||||
prefix=prefix,
|
||||
file_count=file_count,
|
||||
total_size=total_size,
|
||||
accessible_count=accessible_count,
|
||||
total_count=file_count,
|
||||
accessible_size=accessible_size,
|
||||
)
|
||||
)
|
||||
results.exceptions.extend(dir_exceptions)
|
||||
|
||||
return results
|
||||
|
||||
@ -394,7 +434,10 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
|
||||
if results.available:
|
||||
for d in results.available:
|
||||
print(f" {d.prefix}")
|
||||
print(f" Matching files ({ext_label}): {d.file_count} | Total Size: {format_size(d.total_size)}")
|
||||
print(
|
||||
f" Matching files ({ext_label}): {d.accessible_count}/{d.total_count} accessible"
|
||||
f" | Total Size: {format_size(d.accessible_size)}"
|
||||
)
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
@ -412,6 +455,16 @@ def print_results(results: Results, *, extensions: Set[str] | None = None) -> No
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Exceptions ---
|
||||
print()
|
||||
print(f"--- Exceptions ({len(results.exceptions)}) ---")
|
||||
if results.exceptions:
|
||||
for exc in results.exceptions:
|
||||
print(f" {exc.key}")
|
||||
print(f" Directory: {exc.prefix} | Error: {exc.error}")
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
# --- Empty ---
|
||||
print()
|
||||
print(f"--- Empty / no matching files ({len(results.empty)}) ---")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user