diff --git a/generic_loader/load_folder.py b/generic_loader/load_folder.py index d503276..b789888 100644 --- a/generic_loader/load_folder.py +++ b/generic_loader/load_folder.py @@ -1,8 +1,9 @@ -"""Folder-level SAS-to-Postgres loader. +"""Folder-level data-to-Postgres loader. -Wraps :mod:`load_sas` so an entire directory of SAS files can be ingested in -one invocation. A directory often contains several *clusters* of files that -share a schema (e.g. ``group_a1.sas7bdat``, ``group_a2.sas7bdat``, ...). Each +Wraps :mod:`load_sas` so an entire directory of data files (SAS or delimited +text) can be ingested in one invocation. A directory often contains several +*clusters* of files that share a schema (e.g. ``group_a1.sas7bdat``, +``group_a2.sas7bdat``, ... or ``group_a1.csv``, ``group_a2.csv``, ...). Each cluster becomes one Postgres table; files inside a cluster are appended to it. ------------------------------------------------------------------------------- @@ -90,12 +91,14 @@ Flags: Exit codes: 0 - every cluster loaded successfully (or dry-run completed) 1 - at least one cluster failed (details on stderr) - 2 - folder does not exist / contains no SAS files + 2 - folder does not exist / contains no data files 3. Discovery rules ------------------ -* Supported extensions: ``.sas7bdat``, ``.xpt``, ``.xport`` (matches - :mod:`load_sas`). The folder is not scanned recursively. +* Supported SAS extensions: ``.sas7bdat``, ``.xpt``, ``.xport``. + Supported text extensions: ``.txt``, ``.csv``, ``.tsv``. + The ``file_type`` config key controls which set is used. + The folder is not scanned recursively. * Explicit patterns are tried in order. A file matched by one pattern is removed from the pool before the next pattern runs, so earlier patterns win in case of overlap. Overlap between patterns is flagged as an error @@ -166,8 +169,11 @@ from dotenv import load_dotenv from tqdm import tqdm from load_sas import ( + TEXT_EXTENSIONS, + VALID_FILE_TYPES, VALID_IF_EXISTS, _count_partitions, + _is_text_file, _merge_partition_trees, apply_column_filter, assert_schema_compatible, @@ -189,6 +195,7 @@ from load_sas import ( SAS_EXTENSIONS = (".sas7bdat", ".xpt", ".xport") +SUPPORTED_EXTENSIONS = SAS_EXTENSIONS + TEXT_EXTENSIONS # --------------------------------------------------------------------------- @@ -207,6 +214,11 @@ class ClusterSpec: of the auto-union result computed during pre-scan (see :func:`main`). The same dict is threaded through to workers so every file in the cluster infers the same schema. + + Text-file config (``file_type``, ``delimiter``, ``text_encoding``, + ``quotechar``) is propagated from the folder config during + :func:`discover_clusters` so that reader functions receive the correct + parameters for delimited text files. """ tablename: str @@ -221,6 +233,10 @@ class ClusterSpec: indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) all_nullable: bool = False + file_type: str = "sas" + delimiter: str = "," + text_encoding: str = "utf-8" + quotechar: str = '"' @dataclass @@ -257,6 +273,11 @@ class FolderConfig: ``column_types`` is a ``{column_name: postgres_type_str}`` map of user-supplied type overrides that win over the auto-union computed during pre-scan. + + Text-file config (``file_type``, ``delimiter``, ``text_encoding``, + ``quotechar``) controls how delimited text files are discovered and + read. These fields are ignored when ``file_type`` is ``"sas"`` + (the default). """ folder: Path @@ -271,6 +292,10 @@ class FolderConfig: indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) all_nullable: bool = False + file_type: str = "sas" + delimiter: str = "," + text_encoding: str = "utf-8" + quotechar: str = '"' # --------------------------------------------------------------------------- @@ -523,6 +548,30 @@ def load_folder_config(path: Path) -> FolderConfig: ) all_nullable_default = bool(raw_an_folder) + # -- file_type ---------------------------------------------------------- + file_type = str(raw.get("file_type", "sas")).lower() + if file_type not in VALID_FILE_TYPES: + raise ValueError( + f"Config {path}: file_type={file_type!r} is not one of " + f"{VALID_FILE_TYPES}" + ) + + # -- text-file-specific fields ------------------------------------------ + raw_delim = raw.get("delimiter", ",") + if isinstance(raw_delim, str): + delim_lower = raw_delim.lower().strip() + if delim_lower in ("tab", "\\t"): + delimiter = "\t" + elif delim_lower in ("pipe", "|"): + delimiter = "|" + else: + delimiter = raw_delim + else: + delimiter = str(raw_delim) + + text_encoding = str(raw.get("text_encoding", "utf-8")) + quotechar = str(raw.get("quotechar", '"')) + explicit: List[_ExplicitPattern] = [] clusters_raw = raw.get("clusters") or [] if not isinstance(clusters_raw, list): @@ -611,6 +660,10 @@ def load_folder_config(path: Path) -> FolderConfig: indexes=indexes, column_types=column_types or {}, all_nullable=all_nullable_default, + file_type=file_type, + delimiter=delimiter, + text_encoding=text_encoding, + quotechar=quotechar, ) @@ -651,21 +704,52 @@ def _cluster_sort_key(path: Path) -> Tuple[int, str]: return (n, path.stem) -def _list_sas_files(folder: Path) -> List[Path]: +def _list_data_files(folder: Path, file_type: str = "sas") -> List[Path]: + """List data files in ``folder`` filtered by ``file_type``. + + When ``file_type`` is ``"text"``, only text extensions are matched. + When ``file_type`` is ``"sas"`` (the default), only SAS extensions are + matched. This keeps SAS and text file pools separate so a folder + containing both types doesn't accidentally mix them. + """ + if file_type == "text": + extensions = TEXT_EXTENSIONS + else: + extensions = SAS_EXTENSIONS files: List[Path] = [] for p in sorted(folder.iterdir()): - if p.is_file() and p.suffix.lower() in SAS_EXTENSIONS: + if p.is_file() and p.suffix.lower() in extensions: files.append(p) return files +def _list_sas_files(folder: Path) -> List[Path]: + """Backward-compatible wrapper around :func:`_list_data_files`.""" + return _list_data_files(folder, file_type="sas") + + +def _build_text_kw(cluster: ClusterSpec) -> Dict[str, Any]: + """Build the text-file keyword arguments dict from a cluster's config. + + Returns a dict suitable for spreading into :func:`read_sas_preview`, + :func:`read_sas_metadata`, :func:`iter_sas_chunks`, etc. For SAS + file_type clusters the dict still carries the defaults, which the + reader functions ignore for SAS extensions. + """ + return dict( + delimiter=cluster.delimiter, + text_encoding=cluster.text_encoding, + quotechar=cluster.quotechar, + ) + + def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: """Enumerate ``cfg.folder`` and bucket files into ``ClusterSpec`` objects. Pure/IO-bounded: the only filesystem access is listing ``cfg.folder``. No - SAS file is opened here. Explicit patterns are applied first, in config + data file is opened here. Explicit patterns are applied first, in config order; files matched by an earlier pattern are removed from the pool - before the next pattern runs. A file matching two patterns triggers a + before the next pattern runs. A file matching two patterns triggers a hard error (that's almost always a config bug). Partition settings are resolved per cluster: @@ -674,13 +758,24 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: cluster entry override the folder defaults when present. ``None`` means "inherit"; an explicit ``[]`` disables partitioning. * For auto-detected clusters, folder defaults are inherited directly. + + Text-file config (``file_type``, ``delimiter``, ``text_encoding``, + ``quotechar``) is propagated from the folder config to every cluster. """ if not cfg.folder.exists() or not cfg.folder.is_dir(): raise FileNotFoundError(f"Folder not found or not a directory: {cfg.folder}") - pool = _list_sas_files(cfg.folder) + pool = _list_data_files(cfg.folder, file_type=cfg.file_type) clusters: List[ClusterSpec] = [] + # Text-file kwargs to propagate to every cluster. + _text_fields = dict( + file_type=cfg.file_type, + delimiter=cfg.delimiter, + text_encoding=cfg.text_encoding, + quotechar=cfg.quotechar, + ) + # Detect cross-pattern overlap up front for a clearer error message. for i, p_i in enumerate(cfg.explicit): for j in range(i + 1, len(cfg.explicit)): @@ -741,6 +836,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: indexes=resolved_idx, column_types=dict(resolved_ct), all_nullable=resolved_an, + **_text_fields, ) ) continue @@ -759,6 +855,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: indexes=resolved_idx, column_types=dict(resolved_ct), all_nullable=resolved_an, + **_text_fields, ) ) @@ -781,6 +878,7 @@ def discover_clusters(cfg: FolderConfig) -> List[ClusterSpec]: indexes=cfg.indexes, column_types=dict(cfg.column_types), all_nullable=cfg.all_nullable, + **_text_fields, ) ) @@ -799,19 +897,24 @@ def _infer_cluster_schema( *, column_types: Optional[Dict[str, str]] = None, force_nullable: bool = False, + text_kw: Optional[Dict[str, Any]] = None, ) -> Tuple[Dict, Optional[int]]: - """Infer the Postgres column schema from a SAS file preview. + """Infer the Postgres column schema from a data file preview. Returns ``(columns, total_rows)``. ``total_rows`` comes from the - pyreadstat metadata (the file's declared row count) and is threaded + file metadata (the file's declared row count) and is threaded through to :func:`_stream_file` so the tqdm progress bar has a real denominator instead of an indeterminate spinner. ``column_types`` lets the caller pin specific columns to a chosen Postgres type (typically the merged auto-union + YAML overrides for the cluster). ``force_nullable`` stamps every column nullable regardless of what the preview shows - see :func:`load_sas.infer_schema`. + + ``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar`` + through to :func:`read_sas_preview` for text file dispatch. """ - preview_df, meta = read_sas_preview(path) + _tkw = text_kw or {} + preview_df, meta = read_sas_preview(path, **_tkw) preview_df = apply_column_filter(preview_df, include, exclude) total_rows = getattr(meta, "number_rows", None) columns = infer_schema( @@ -834,10 +937,11 @@ def _discover_cluster_partitions( Each file is scanned chunk-by-chunk so the full dataset is never materialized in memory. """ + tkw = _build_text_kw(cluster) merged: dict = {} for path in cluster.files: def _filtered_chunks(p=path): - for chunk_df, _chunk_meta in iter_sas_chunks(p): + for chunk_df, _chunk_meta in iter_sas_chunks(p, **tkw): yield apply_column_filter( chunk_df, cluster.include, cluster.exclude ) @@ -888,11 +992,14 @@ def load_cluster( if not cluster.files: return 0 + tkw = _build_text_kw(cluster) + first, *rest = cluster.files first_columns, first_total_rows = _infer_cluster_schema( first, cluster.include, cluster.exclude, column_types=cluster.column_types, force_nullable=cluster.all_nullable, + text_kw=tkw, ) # -- Validate index columns early --------------------------------------- @@ -974,6 +1081,7 @@ def load_cluster( column_types=cluster.column_types, force_nullable=cluster.all_nullable, abort_on_first_failure=abort_on_first_failure, + text_kw=tkw, ) else: # Serial path: stream the first file on the main connection, then @@ -985,6 +1093,7 @@ def load_cluster( cluster.include, cluster.exclude, total_rows=first_total_rows, progress_queue=progress_queue, + text_kw=tkw, ) conn.commit() for path in rest: @@ -992,6 +1101,7 @@ def load_cluster( path, cluster.include, cluster.exclude, column_types=cluster.column_types, force_nullable=cluster.all_nullable, + text_kw=tkw, ) # Uses the same check that if_exists=append runs. A type # mismatch or missing column aborts the cluster; because @@ -1005,6 +1115,7 @@ def load_cluster( cluster.include, cluster.exclude, total_rows=path_total_rows, progress_queue=progress_queue, + text_kw=tkw, ) # -- Index support ------------------------------------------------------ @@ -1025,6 +1136,7 @@ def _stream_file( *, total_rows: Optional[int] = None, progress_queue: Any = None, + text_kw: Optional[Dict[str, Any]] = None, ) -> int: """Stream ``path`` into an existing table chunk by chunk. @@ -1034,10 +1146,15 @@ def _stream_file( progress bar from a background drainer thread, which is the only way to keep a coherent progress view when the folder loader is running files in parallel workers. + + ``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar`` + through to :func:`iter_sas_chunks` for text file dispatch. """ + _tkw = text_kw or {} + def _chunks(): if progress_queue is not None: - for chunk_df, _chunk_meta in iter_sas_chunks(path): + for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw): chunk_df = apply_column_filter(chunk_df, include, exclude) progress_queue.put(("rows", len(chunk_df))) yield chunk_df @@ -1052,7 +1169,7 @@ def _stream_file( dynamic_ncols=True, ) try: - for chunk_df, _chunk_meta in iter_sas_chunks(path): + for chunk_df, _chunk_meta in iter_sas_chunks(path, **_tkw): chunk_df = apply_column_filter(chunk_df, include, exclude) pbar.update(len(chunk_df)) yield chunk_df @@ -1077,8 +1194,9 @@ def _worker_load_append_file( db_overrides: Optional[Dict[str, Optional[str]]], column_types: Optional[Dict[str, str]] = None, force_nullable: bool = False, + text_kw: Optional[Dict[str, Any]] = None, ) -> Tuple[str, int, Optional[str]]: - """Worker process: load one SAS file in append mode. + """Worker process: load one data file in append mode. Runs in a subprocess spawned by :func:`_load_remaining_files_parallel`. Opens its own psycopg2 connection, re-infers the per-file schema (so @@ -1087,6 +1205,9 @@ def _worker_load_append_file( via ``COPY``. Row counts are published to the shared queue for the main process's global tqdm bar. + ``text_kw`` carries ``delimiter``, ``text_encoding``, ``quotechar`` + through to the reader functions for text file dispatch. + Returns ``(path_str, rows_loaded, error_or_None)`` - failures are returned rather than raised so the parent can aggregate results across workers without losing partial progress. @@ -1111,9 +1232,10 @@ def _worker_load_append_file( _load_dotenv() + _tkw = text_kw or {} path = _Path(path_str) try: - preview_df, meta = _read_sas_preview(path) + preview_df, meta = _read_sas_preview(path, **_tkw) preview_df = _apply_column_filter(preview_df, include, exclude) total_rows = getattr(meta, "number_rows", None) columns = _infer_schema( @@ -1134,7 +1256,7 @@ def _worker_load_append_file( _assert_schema_compatible(conn, schemaname, tablename, columns) def _chunks(): - for chunk_df, _chunk_meta in _iter_sas_chunks(path): + for chunk_df, _chunk_meta in _iter_sas_chunks(path, **_tkw): chunk_df = _apply_column_filter(chunk_df, include, exclude) if progress_queue is not None: progress_queue.put(("rows", len(chunk_df))) @@ -1198,6 +1320,7 @@ def _load_remaining_files_parallel( column_types: Optional[Dict[str, str]] = None, force_nullable: bool = False, abort_on_first_failure: bool = False, + text_kw: Optional[Dict[str, Any]] = None, ) -> int: """Run append-mode loads for ``files`` across a process pool. @@ -1253,6 +1376,7 @@ def _load_remaining_files_parallel( db_overrides, column_types, force_nullable, + text_kw, ) for p in files ] @@ -1348,8 +1472,8 @@ def _load_remaining_files_parallel( def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description=( - "Load every SAS file in a folder into Postgres, grouping files " - "into clusters that each become one table." + "Load every data file (SAS or delimited text) in a folder into " + "Postgres, grouping files into clusters that each become one table." ), ) p.add_argument("--config", required=True, type=Path, help="Path to YAML config") @@ -1399,10 +1523,10 @@ def _build_argparser() -> argparse.ArgumentParser: default=None, metavar="N", help=( - "Per-chunk row target for pyreadstat streaming and COPY. " + "Per-chunk row target for streaming and COPY. " "Overrides both the GENERIC_LOADER_CHUNK_ROWS env var and the " "auto-scaling applied when --workers > 1. Peak memory per " - "worker is roughly 4 × N × avg_row_bytes; with wide sas7bdat " + "worker is roughly 4 × N × avg_row_bytes; with wide data " "files (~4 KB/row) and 32 workers, N=100000 is a safe starting " "point on a 128 GB box." ), @@ -1494,9 +1618,15 @@ def main(argv: Optional[List[str]] = None) -> int: loadable = [c for c in clusters if c.files] if not loadable: + if cfg.file_type == "text": + ext_label = ", ".join(TEXT_EXTENSIONS) + kind = "text" + else: + ext_label = ", ".join(SAS_EXTENSIONS) + kind = "SAS" print( - f"error: no SAS files found in {cfg.folder} " - f"(looked for {', '.join(SAS_EXTENSIONS)})", + f"error: no {kind} files found in {cfg.folder} " + f"(looked for {ext_label})", file=sys.stderr, ) return 2 @@ -1513,10 +1643,12 @@ def main(argv: Optional[List[str]] = None) -> int: # ``column_types`` from YAML are already baked into ``c.column_types`` # by ``discover_clusters`` - honor them here so the previewed DDL # matches what a real load would produce on a single-file cluster. + _dry_tkw = _build_text_kw(c) columns, _ = _infer_cluster_schema( c.files[0], c.include, c.exclude, column_types=c.column_types, force_nullable=c.all_nullable, + text_kw=_dry_tkw, ) # Print parent CREATE TABLE (with PARTITION BY if applicable). print( @@ -1671,7 +1803,12 @@ def main(argv: Optional[List[str]] = None) -> int: Optional[str], ]: try: - meta = read_sas_metadata(p) + _prescan_tkw = dict( + delimiter=cfg.delimiter, + text_encoding=cfg.text_encoding, + quotechar=cfg.quotechar, + ) + meta = read_sas_metadata(p, **_prescan_tkw) n = getattr(meta, "number_rows", None) col_meta = extract_union_metadata(meta) return ( diff --git a/generic_loader/load_sas.py b/generic_loader/load_sas.py index 8db1d5c..ed65ef8 100644 --- a/generic_loader/load_sas.py +++ b/generic_loader/load_sas.py @@ -1,4 +1,4 @@ -"""Per-file SAS-to-Postgres loader. +"""Per-file data-to-Postgres loader (SAS and delimited text). Library-style functions plus a thin CLI wrapper. Designed so an orchestrator can wrap the library for directory/batch mode; orchestration is out of scope @@ -16,10 +16,11 @@ USAGE Supported inputs: * ``.sas7bdat`` (read with ``encoding="latin-1"``) * ``.xpt`` / ``.xport`` (SAS transport files) + * ``.csv`` / ``.tsv`` / ``.txt`` (delimited text files with headers) 1. YAML config -------------- -Every invocation is driven by a YAML file describing one SAS file to load:: +Every invocation is driven by a YAML file describing one data file to load:: filename: samples/sample_kitchensink.xpt # required; relative paths are # resolved against the config @@ -316,6 +317,12 @@ changes. Explicit ``chunksize=`` kwargs still win over both.""" VALID_IF_EXISTS = ("fail", "replace", "append") +VALID_FILE_TYPES = ("sas", "text") +"""Supported ``file_type`` values in the YAML config.""" + +TEXT_EXTENSIONS = (".txt", ".csv", ".tsv") +"""File extensions recognised as delimited text files.""" + _PG_IDENT_MAX_LEN = 63 """PostgreSQL maximum identifier length in bytes (characters for ASCII).""" @@ -325,6 +332,20 @@ _PG_IDENT_MAX_LEN = 63 # --------------------------------------------------------------------------- +@dataclass +class TextFileMetadata: + """Minimal metadata object for text files, mimicking pyreadstat metadata. + + Provides the same attribute surface that :func:`infer_schema` reads from + pyreadstat metadata objects: ``column_names``, ``column_labels``, + ``original_variable_types``, and ``number_rows``. + """ + column_names: List[str] + column_labels: List[str] + original_variable_types: Dict[str, str] + number_rows: Optional[int] = None + + @dataclass class LoaderConfig: filename: Path @@ -338,6 +359,10 @@ class LoaderConfig: indexes: List[str] = field(default_factory=list) column_types: Dict[str, str] = field(default_factory=dict) all_nullable: bool = False + file_type: str = "sas" + delimiter: str = "," + text_encoding: str = "utf-8" + quotechar: str = '"' @dataclass @@ -590,6 +615,31 @@ def load_config(path: Path) -> LoaderConfig: ) all_nullable = bool(raw_an) + # -- file_type ---------------------------------------------------------- + file_type = str(raw.get("file_type", "sas")).lower() + if file_type not in VALID_FILE_TYPES: + raise ValueError( + f"Config {path}: file_type={file_type!r} is not one of " + f"{VALID_FILE_TYPES}" + ) + + # -- text-file-specific fields ------------------------------------------ + # Only validated when file_type == "text"; harmless defaults otherwise. + raw_delim = raw.get("delimiter", ",") + if isinstance(raw_delim, str): + delim_lower = raw_delim.lower().strip() + if delim_lower in ("tab", "\\t"): + delimiter = "\t" + elif delim_lower in ("pipe", "|"): + delimiter = "|" + else: + delimiter = raw_delim + else: + delimiter = str(raw_delim) + + text_encoding = str(raw.get("text_encoding", "utf-8")) + quotechar = str(raw.get("quotechar", '"')) + return LoaderConfig( filename=filename, schemaname=schemaname, @@ -602,6 +652,10 @@ def load_config(path: Path) -> LoaderConfig: indexes=indexes, column_types=column_types, all_nullable=all_nullable, + file_type=file_type, + delimiter=delimiter, + text_encoding=text_encoding, + quotechar=quotechar, ) @@ -610,6 +664,11 @@ def load_config(path: Path) -> LoaderConfig: # --------------------------------------------------------------------------- +def _is_text_file(path: Path) -> bool: + """Return True if ``path`` has a recognised delimited-text extension.""" + return Path(path).suffix.lower() in TEXT_EXTENSIONS + + def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]: """Return ``(pyreadstat_reader, extra_kwargs)`` for ``path``. @@ -627,13 +686,192 @@ def _sas_reader(path: Path) -> Tuple[Any, Dict[str, Any]]: raise ValueError(f"Unsupported SAS file extension: {suffix}") -def read_sas(path: Path) -> Tuple[pd.DataFrame, Any]: - """Read an entire SAS file into memory. Only safe for small files. +# --------------------------------------------------------------------------- +# Text file readers +# --------------------------------------------------------------------------- + + +def _count_text_lines(path: Path, encoding: str = "utf-8") -> int: + """Count data rows in a text file (excludes the header line). + + Reads the file in binary chunks for speed; counts newlines and + subtracts one for the header. + """ + count = 0 + with open(path, "rb") as fh: + for chunk in iter(lambda: fh.read(1 << 20), b""): + count += chunk.count(b"\n") + # If the file doesn't end with a newline the last line is still a row. + # But the first line is the header, so subtract 1. + # Edge case: empty file or header-only -> 0 rows. + return max(0, count - 1) if count > 0 else 0 + + +def _build_text_metadata( + column_names: List[str], + number_rows: Optional[int] = None, +) -> TextFileMetadata: + """Build a :class:`TextFileMetadata` from column names and an optional + row count.""" + return TextFileMetadata( + column_names=list(column_names), + column_labels=list(column_names), + original_variable_types={}, + number_rows=number_rows, + ) + + +def read_text( + path: Path, + delimiter: str = ",", + encoding: str = "utf-8", + quotechar: str = '"', +) -> Tuple[pd.DataFrame, TextFileMetadata]: + """Read an entire delimited text file into memory. + + Returns ``(DataFrame, TextFileMetadata)`` — the metadata object carries + the same attributes that :func:`infer_schema` reads from pyreadstat + metadata. + """ + path = Path(path) + df = pd.read_csv( + path, + delimiter=delimiter, + encoding=encoding, + quotechar=quotechar, + dtype=str, + keep_default_na=True, + na_values=[""], + ) + meta = _build_text_metadata(list(df.columns), number_rows=len(df)) + return df, meta + + +def read_text_preview( + path: Path, + delimiter: str = ",", + encoding: str = "utf-8", + quotechar: str = '"', + rows: Optional[int] = None, +) -> Tuple[pd.DataFrame, TextFileMetadata]: + """Read the first ``rows`` records from a delimited text file. + + When ``rows`` is ``None`` or 0, reads the entire file (matching the + semantics of :func:`read_sas_preview`). + """ + path = Path(path) + nrows = int(rows) if rows else None + df = pd.read_csv( + path, + delimiter=delimiter, + encoding=encoding, + quotechar=quotechar, + nrows=nrows, + dtype=str, + keep_default_na=True, + na_values=[""], + ) + # For total row count, do a fast line count when we only read a preview. + if nrows is not None and nrows > 0: + total = _count_text_lines(path, encoding) + else: + total = len(df) + meta = _build_text_metadata(list(df.columns), number_rows=total) + return df, meta + + +def read_text_metadata( + path: Path, + delimiter: str = ",", + encoding: str = "utf-8", + quotechar: str = '"', +) -> TextFileMetadata: + """Read only the header and line count from a delimited text file. + + Fast path: reads the first line for column names and counts newlines + for the row total without materializing a DataFrame. + """ + path = Path(path) + # Read just the header row. + df_header = pd.read_csv( + path, + delimiter=delimiter, + encoding=encoding, + quotechar=quotechar, + nrows=0, + ) + column_names = list(df_header.columns) + total = _count_text_lines(path, encoding) + return _build_text_metadata(column_names, number_rows=total) + + +def iter_text_chunks( + path: Path, + delimiter: str = ",", + encoding: str = "utf-8", + quotechar: str = '"', + chunksize: Optional[int] = None, +): + """Yield ``(df_chunk, meta)`` tuples for streaming text file loads. + + Uses ``pandas.read_csv()`` with ``chunksize`` for memory-efficient + iteration. The metadata object is rebuilt for each chunk with the + chunk's column names and ``number_rows`` set to the total file rows + (computed once up front). + """ + path = Path(path) + if chunksize is None: + raw_env = os.environ.get("GENERIC_LOADER_CHUNK_ROWS") + if raw_env is not None: + try: + chunksize = int(raw_env) + except ValueError: + chunksize = DEFAULT_CHUNK_ROWS + else: + chunksize = DEFAULT_CHUNK_ROWS + + total = _count_text_lines(path, encoding) + + reader = pd.read_csv( + path, + delimiter=delimiter, + encoding=encoding, + quotechar=quotechar, + chunksize=chunksize, + dtype=str, + keep_default_na=True, + na_values=[""], + ) + for chunk_df in reader: + meta = _build_text_metadata(list(chunk_df.columns), number_rows=total) + yield chunk_df, meta + + +# --------------------------------------------------------------------------- +# Unified reader dispatch +# --------------------------------------------------------------------------- + + +def read_sas( + path: Path, + *, + delimiter: str = ",", + text_encoding: str = "utf-8", + quotechar: str = '"', +) -> Tuple[pd.DataFrame, Any]: + """Read an entire SAS or delimited text file into memory. + + For SAS files (``.sas7bdat``, ``.xpt``, ``.xport``), delegates to + pyreadstat. For text files (``.txt``, ``.csv``, ``.tsv``), delegates + to :func:`read_text`. The text-specific parameters are ignored for SAS + files. Kept for backward compatibility and tests; the CLI now uses :func:`read_sas_preview` + :func:`iter_sas_chunks` so it never materializes the whole frame at once. """ + if _is_text_file(path): + return read_text(path, delimiter=delimiter, encoding=text_encoding, quotechar=quotechar) reader, kwargs = _sas_reader(path) return reader(str(Path(path)), **kwargs) @@ -642,30 +880,55 @@ def read_sas_preview( path: Path, *, rows: Optional[int] = None, + delimiter: str = ",", + text_encoding: str = "utf-8", + quotechar: str = '"', ) -> Tuple[pd.DataFrame, Any]: """Read the first ``rows`` records from ``path`` plus its metadata. Defaults to ``TYPE_INFERENCE_SAMPLE_ROWS`` when ``rows`` is not given. Passing ``rows=None`` with ``TYPE_INFERENCE_SAMPLE_ROWS=None`` reads the whole file (pyreadstat treats ``row_limit=0`` as unlimited). + + For text files, delegates to :func:`read_text_preview`. """ - reader, kwargs = _sas_reader(path) effective = rows if rows is not None else TYPE_INFERENCE_SAMPLE_ROWS + if _is_text_file(path): + return read_text_preview( + path, + delimiter=delimiter, + encoding=text_encoding, + quotechar=quotechar, + rows=effective, + ) + reader, kwargs = _sas_reader(path) row_limit = int(effective) if effective else 0 return reader(str(Path(path)), row_limit=row_limit, **kwargs) -def read_sas_metadata(path: Path) -> Any: - """Read only the metadata (no rows) from a SAS file. +def read_sas_metadata( + path: Path, + *, + delimiter: str = ",", + text_encoding: str = "utf-8", + quotechar: str = '"', +) -> Any: + """Read only the metadata (no rows) from a SAS or text file. - Uses pyreadstat's ``metadataonly=True`` fast path: the reader decodes - the file header (column names, formats, total row count, etc.) and - returns without touching the data pages. Orders of magnitude faster - than :func:`read_sas_preview` when all you need is + Uses pyreadstat's ``metadataonly=True`` fast path for SAS files: the + reader decodes the file header (column names, formats, total row count, + etc.) and returns without touching the data pages. Orders of magnitude + faster than :func:`read_sas_preview` when all you need is ``meta.number_rows`` - typically a few ms per sas7bdat file, which makes it cheap to pre-scan a whole folder to populate a global progress bar. + + For text files, delegates to :func:`read_text_metadata`. """ + if _is_text_file(path): + return read_text_metadata( + path, delimiter=delimiter, encoding=text_encoding, quotechar=quotechar, + ) reader, kwargs = _sas_reader(path) _, meta = reader(str(Path(path)), metadataonly=True, **kwargs) return meta @@ -675,6 +938,9 @@ def iter_sas_chunks( path: Path, *, chunksize: Optional[int] = None, + delimiter: str = ",", + text_encoding: str = "utf-8", + quotechar: str = '"', ): """Yield ``(df_chunk, meta)`` tuples for streaming loads. @@ -685,7 +951,18 @@ def iter_sas_chunks( from the ``GENERIC_LOADER_CHUNK_ROWS`` environment variable if set and parseable, otherwise from :data:`DEFAULT_CHUNK_ROWS`. An explicit int always wins. + + For text files, delegates to :func:`iter_text_chunks`. """ + if _is_text_file(path): + yield from iter_text_chunks( + path, + delimiter=delimiter, + encoding=text_encoding, + quotechar=quotechar, + chunksize=chunksize, + ) + return if chunksize is None: raw = os.environ.get("GENERIC_LOADER_CHUNK_ROWS") if raw is not None: @@ -2414,7 +2691,7 @@ def validate_against_manifest( def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( - description="Load a single SAS file (XPT or sas7bdat) into Postgres.", + description="Load a single data file (SAS or delimited text) into Postgres.", ) p.add_argument("--config", required=True, type=Path, help="Path to YAML config") p.add_argument( @@ -2467,9 +2744,18 @@ def main(argv: Optional[List[str]] = None) -> int: cfg = load_config(args.config) if not cfg.filename.exists(): - print(f"error: SAS file not found: {cfg.filename}", file=sys.stderr) + file_label = "text file" if cfg.file_type == "text" else "SAS file" + print(f"error: {file_label} not found: {cfg.filename}", file=sys.stderr) return 2 + # Build kwargs dict for text-file parameters. These are passed through + # to the unified reader functions and silently ignored for SAS files. + _text_kw: Dict[str, Any] = dict( + delimiter=cfg.delimiter, + text_encoding=cfg.text_encoding, + quotechar=cfg.quotechar, + ) + # Schema inference reads the whole file so type + nullability are # computed against every row. That's what the target host has the # resources for and is the only way to honestly emit ``NOT NULL`` - @@ -2478,7 +2764,7 @@ def main(argv: Optional[List[str]] = None) -> int: # fit the file in memory, override ``TYPE_INFERENCE_SAMPLE_ROWS`` to # an integer cap and know that sampled specs may stamp ``NOT NULL`` # on columns whose nulls live past the window. - preview_df, meta = read_sas_preview(cfg.filename) + preview_df, meta = read_sas_preview(cfg.filename, **_text_kw) preview_df = apply_column_filter(preview_df, cfg.include, cfg.exclude) force_nullable = args.all_nullable or cfg.all_nullable columns = infer_schema( @@ -2527,7 +2813,7 @@ def main(argv: Optional[List[str]] = None) -> int: print(" discovering partition values (full file scan)...", file=sys.stderr) def _discovery_chunks(): - for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename): + for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename, **_text_kw): yield apply_column_filter(chunk_df, cfg.include, cfg.exclude) partition_values = discover_partition_values_chunked( @@ -2589,7 +2875,7 @@ def main(argv: Optional[List[str]] = None) -> int: dynamic_ncols=True, ) try: - for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename): + for chunk_df, _chunk_meta in iter_sas_chunks(cfg.filename, **_text_kw): chunk_df = apply_column_filter(chunk_df, cfg.include, cfg.exclude) pbar.update(len(chunk_df)) yield chunk_df diff --git a/generic_loader/sample_config.yaml b/generic_loader/sample_config.yaml index 223cc1a..38c1e6a 100644 --- a/generic_loader/sample_config.yaml +++ b/generic_loader/sample_config.yaml @@ -16,6 +16,23 @@ tablename: kitchensink # Defaults to fail. if_exists: append +# file_type: Type of data file to load. One of: sas | text. Default: sas. +# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat +# text - Delimited text files (.txt, .csv, .tsv) read via pandas +# file_type: sas + +# delimiter: Column delimiter for text files. Only used when file_type: text. +# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe), +# or any single character. +# delimiter: "," + +# text_encoding: Character encoding for text files. Default: utf-8. +# Common alternatives: latin-1, cp1252, iso-8859-1. +# text_encoding: utf-8 + +# quotechar: Quote character for text files. Default: '"' (double quote). +# quotechar: '"' + # partition_by: Partition the table by unique values of these columns. # Columns are applied in cascading order (first column = top-level partition). # Requires if_exists: replace or fail (not append for initial creation). diff --git a/generic_loader/sample_folder_config.yaml b/generic_loader/sample_folder_config.yaml index 4cb394c..909c57b 100644 --- a/generic_loader/sample_folder_config.yaml +++ b/generic_loader/sample_folder_config.yaml @@ -27,6 +27,25 @@ if_exists: replace # see the embedded-digit example near the bottom of this file. auto_detect: true +# file_type: Type of data files in this folder. One of: sas | text. Default: sas. +# sas - SAS files (.sas7bdat, .xpt, .xport) read via pyreadstat +# text - Delimited text files (.txt, .csv, .tsv) read via pandas +# When set to 'text', the folder scanner looks for .txt/.csv/.tsv files +# instead of .sas7bdat/.xpt/.xport files. +# file_type: sas + +# delimiter: Column delimiter for text files. Only used when file_type: text. +# Accepts: "," (comma, default), "tab" or "\t" (tab), "pipe" or "|" (pipe), +# or any single character. +# delimiter: "," + +# text_encoding: Character encoding for text files. Default: utf-8. +# Common alternatives: latin-1, cp1252, iso-8859-1. +# text_encoding: utf-8 + +# quotechar: Quote character for text files. Default: '"' (double quote). +# quotechar: '"' + # Folder-level column filter. Every file in every cluster passes through # this filter. `include` and `exclude` are mutually exclusive. A cluster can # override these via its own `include` / `exclude` keys. @@ -148,6 +167,10 @@ clusters: # - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$' # tablename: year2020_regionA_detail + # Text file cluster example (when file_type: text): + # - pattern: '^data_group_a\d+\.txt$' + # tablename: data_group_a + # With only the group_a pattern explicit, auto_detect: true will still # bucket group_b1.xpt + group_b2.xpt into a "group_b" cluster and the lone # standalone.xpt into a "standalone" cluster. See generate_sample_folder.py diff --git a/utils/data_explorer.py b/utils/data_explorer.py index 617a400..587f2bc 100644 --- a/utils/data_explorer.py +++ b/utils/data_explorer.py @@ -1,28 +1,37 @@ """Explore S3 directories and categorise them by accessibility. Reads a text file containing one S3 prefix per line (paths within the bucket -configured by the ``S3_BUCKET`` constant), then for each prefix: +configured by the ``S3_BUCKET`` constant or ``--bucket`` CLI argument), then +for each prefix: + - Lists all objects recursively (via ``list_objects_v2`` paginator) -- **Only considers files matching the ``FILE_EXTENSION`` filter** (default - ``.sas7bdat``). All other file types are ignored. +- **Only considers files matching the configured extensions** (default: all + supported extensions — SAS and text). All other file types are ignored. - Tests read permission with ``head_object`` on the first matching file found - Categorises the directory as **Available**, **Blocked**, or **Empty** +Supported file types +-------------------- +* **SAS files**: ``.sas7bdat``, ``.xpt``, ``.xport`` +* **Text / delimited files**: ``.txt``, ``.csv``, ``.tsv`` + A directory is considered *empty* if it contains no files matching the extension filter, even when other file types are present. -Configure the constants below, then run:: +Configure the constants below (or use CLI arguments), then run:: - python3 data_explorer.py + python3 data_explorer.py [OPTIONS] -Python 3.10+ compatible. Requires only ``boto3`` / ``botocore`` and stdlib. +Python 3.10+ compatible. Requires ``boto3`` / ``botocore`` and stdlib. """ from __future__ import annotations +import argparse +import os import sys from dataclasses import dataclass, field -from typing import List, Tuple +from typing import List, Set, Tuple # --------------------------------------------------------------------------- # Dependency check @@ -41,11 +50,25 @@ except ImportError: # --------------------------------------------------------------------------- -# Configuration — edit these before running +# Extension constants # --------------------------------------------------------------------------- -FILE_EXTENSION: str = ".sas7bdat" -"""Only files whose key ends with this extension (case-insensitive) are considered.""" +SAS_EXTENSIONS: Set[str] = {".sas7bdat", ".xpt", ".xport"} +"""File extensions recognised as SAS data files.""" + +TEXT_EXTENSIONS: Set[str] = {".txt", ".csv", ".tsv"} +"""File extensions recognised as delimited text / CSV files.""" + +SUPPORTED_EXTENSIONS: Set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS +"""Union of all file extensions this tool can work with.""" + + +# --------------------------------------------------------------------------- +# Configuration defaults — edit these or override via CLI arguments +# --------------------------------------------------------------------------- + +FILE_EXTENSIONS: Set[str] = SUPPORTED_EXTENSIONS +"""Set of extensions to filter on (case-insensitive). Defaults to all supported.""" INPUT_FILE: str = "s3_directories.txt" """Path to the text file containing one S3 prefix per line.""" @@ -56,6 +79,57 @@ S3_BUCKET: str = "my-bucket" AWS_PROFILE: str = "default" """AWS CLI profile name used for authentication.""" +# Text-file reading defaults (used when downloading / previewing text files) +DEFAULT_DELIMITER: str = "," +DEFAULT_ENCODING: str = "utf-8" +DEFAULT_QUOTECHAR: str = '"' + + +# --------------------------------------------------------------------------- +# Auto-detection helpers +# --------------------------------------------------------------------------- + + +def detect_file_type(filename: str) -> str: + """Return ``'sas'``, ``'text'``, or ``'unknown'`` based on *filename* extension. + + The check is case-insensitive. For ``.tsv`` files the caller should + default the delimiter to a tab character (``'\\t'``). + + Examples + -------- + >>> detect_file_type("data.sas7bdat") + 'sas' + >>> detect_file_type("report.CSV") + 'text' + >>> detect_file_type("archive.zip") + 'unknown' + """ + ext = os.path.splitext(filename)[1].lower() + if ext in SAS_EXTENSIONS: + return "sas" + if ext in TEXT_EXTENSIONS: + return "text" + return "unknown" + + +def default_delimiter_for(filename: str) -> str: + """Return a sensible default delimiter for *filename*. + + * ``.tsv`` → ``'\\t'`` + * everything else → ``','`` + """ + ext = os.path.splitext(filename)[1].lower() + if ext == ".tsv": + return "\t" + return "," + + +def matches_extensions(key: str, extensions: Set[str]) -> bool: + """Return ``True`` if *key* ends with any extension in *extensions* (case-insensitive).""" + key_lower = key.lower() + return any(key_lower.endswith(ext) for ext in extensions) + # --------------------------------------------------------------------------- # Data structures @@ -134,15 +208,22 @@ def format_size(size_bytes: int) -> str: return f"{size_bytes:,.1f} TB" +def extensions_label(extensions: Set[str]) -> str: + """Return a compact, sorted label for a set of extensions (e.g. ``.csv/.tsv/.txt``).""" + return "/".join(sorted(extensions)) + + def list_objects( s3_client: "botocore.client.S3", bucket: str, prefix: str, + extensions: Set[str] | None = None, ) -> Tuple[str | None, int, int]: """Recursively list all objects under *prefix* using streaming counters. - Only objects whose key ends with ``FILE_EXTENSION`` (case-insensitive) are - counted. All other files are silently skipped. + Only objects whose key ends with one of *extensions* (case-insensitive) are + counted. All other files are silently skipped. When *extensions* is + ``None`` the module-level ``FILE_EXTENSIONS`` set is used. Returns ``(first_key, file_count, total_size)`` where *first_key* is the key of the first matching object found (or ``None`` if no matching files @@ -152,14 +233,16 @@ def list_objects( Unlike the previous implementation this never accumulates all keys in memory, making it safe for prefixes with millions of objects. """ - ext_lower = FILE_EXTENSION.lower() + if extensions is None: + extensions = FILE_EXTENSIONS + exts_lower = {e.lower() for e in extensions} paginator = s3_client.get_paginator("list_objects_v2") first_key: str | None = None file_count: int = 0 total_size: int = 0 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): - if not obj["Key"].lower().endswith(ext_lower): + if not any(obj["Key"].lower().endswith(ext) for ext in exts_lower): continue if first_key is None: first_key = obj["Key"] @@ -188,8 +271,26 @@ def check_read_permission( # --------------------------------------------------------------------------- -def explore_directories(prefixes: List[str]) -> Results: - """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*.""" +def explore_directories( + prefixes: List[str], + *, + extensions: Set[str] | None = None, +) -> Results: + """Explore every prefix in ``S3_BUCKET`` and return categorised *Results*. + + Parameters + ---------- + prefixes: + List of S3 key prefixes to explore. + extensions: + Set of file extensions to filter on. Defaults to the module-level + ``FILE_EXTENSIONS`` (which itself defaults to ``SUPPORTED_EXTENSIONS``). + """ + if extensions is None: + extensions = FILE_EXTENSIONS + exts_lower = {e.lower() for e in extensions} + ext_label = extensions_label(extensions) + session = boto3.Session(profile_name=AWS_PROFILE) s3 = session.client("s3") @@ -198,13 +299,15 @@ def explore_directories(prefixes: List[str]) -> Results: for idx, prefix in enumerate(prefixes, start=1): print( - f"[{idx}/{total}] Checking {prefix} (filtering for {FILE_EXTENSION}) ...", + f"[{idx}/{total}] Checking {prefix} (filtering for {ext_label}) ...", file=sys.stderr, ) # --- Recursive listing ------------------------------------------------ try: - first_key, file_count, total_size = list_objects(s3, S3_BUCKET, prefix) + first_key, file_count, total_size = list_objects( + s3, S3_BUCKET, prefix, extensions=extensions, + ) except botocore.exceptions.ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Unknown") message = exc.response.get("Error", {}).get("Message", str(exc)) @@ -225,12 +328,11 @@ def explore_directories(prefixes: List[str]) -> Results: # --- Permission check ------------------------------------------------- # Prefer a real object over a zero-byte directory marker (key ending # in "/") for the head_object test. The selected key must also match - # the FILE_EXTENSION filter. If no suitable key is found, fall back - # to first_key. - ext_lower = FILE_EXTENSION.lower() + # the extension filter. If no suitable key is found, fall back to + # first_key. test_key = first_key if first_key.endswith("/") and total_size > 0: - # Re-scan the first page to find a non-marker key matching the extension + # Re-scan the first page to find a non-marker key matching the extensions try: probe_paginator = s3.get_paginator("list_objects_v2") for probe_page in probe_paginator.paginate( @@ -239,7 +341,7 @@ def explore_directories(prefixes: List[str]) -> Results: for obj in probe_page.get("Contents", []): if ( not (obj["Key"].endswith("/") and obj["Size"] == 0) - and obj["Key"].lower().endswith(ext_lower) + and any(obj["Key"].lower().endswith(ext) for ext in exts_lower) ): test_key = obj["Key"] break @@ -266,11 +368,25 @@ def explore_directories(prefixes: List[str]) -> Results: # --------------------------------------------------------------------------- -def print_results(results: Results) -> None: - """Print a clean, human-readable summary to stdout.""" +def print_results(results: Results, *, extensions: Set[str] | None = None) -> None: + """Print a clean, human-readable summary to stdout. + + Parameters + ---------- + results: + The exploration results to display. + extensions: + The set of extensions that were used for filtering. Used only for + labelling in the output. Defaults to ``FILE_EXTENSIONS``. + """ + if extensions is None: + extensions = FILE_EXTENSIONS + ext_label = extensions_label(extensions) + print() print("=== S3 Directory Explorer Results ===") print(f"Bucket: {S3_BUCKET}") + print(f"Extensions: {ext_label}") # --- Available --- print() @@ -278,7 +394,7 @@ def print_results(results: Results) -> None: if results.available: for d in results.available: print(f" {d.prefix}") - print(f" {FILE_EXTENSION} files: {d.file_count} | Total Size: {format_size(d.total_size)}") + print(f" Matching files ({ext_label}): {d.file_count} | Total Size: {format_size(d.total_size)}") else: print(" (none)") @@ -289,7 +405,7 @@ def print_results(results: Results) -> None: for d in results.blocked: if d.file_count: print(f" {d.prefix}") - print(f" {FILE_EXTENSION} files found: {d.file_count} | Error: {d.error}") + print(f" Matching files ({ext_label}) found: {d.file_count} | Error: {d.error}") else: print(f" {d.prefix}") print(f" Error: {d.error}") @@ -298,7 +414,7 @@ def print_results(results: Results) -> None: # --- Empty --- print() - print(f"--- Empty / no {FILE_EXTENSION} files ({len(results.empty)}) ---") + print(f"--- Empty / no matching files ({len(results.empty)}) ---") if results.empty: for d in results.empty: print(f" {d.prefix}") @@ -308,20 +424,163 @@ def print_results(results: Results) -> None: print() +# --------------------------------------------------------------------------- +# CLI argument parsing +# --------------------------------------------------------------------------- + + +def build_arg_parser() -> argparse.ArgumentParser: + """Build and return the CLI argument parser. + + Supports selecting file-type filters, text-file reading parameters, and + overriding the default bucket / profile / input-file settings. + """ + parser = argparse.ArgumentParser( + description=( + "Explore S3 directories and categorise them by accessibility. " + "Supports SAS files (.sas7bdat, .xpt, .xport) and delimited text " + "files (.txt, .csv, .tsv)." + ), + ) + + # --- File-type / extension selection --- + type_group = parser.add_argument_group("File-type selection") + type_group.add_argument( + "--file-type", + choices=["sas", "text", "all"], + default="all", + help=( + "Restrict the scan to a specific file type. " + "'sas' = .sas7bdat/.xpt/.xport only; " + "'text' = .txt/.csv/.tsv only; " + "'all' = both (default)." + ), + ) + type_group.add_argument( + "--extensions", + nargs="+", + metavar="EXT", + help=( + "Explicit list of extensions to filter on (e.g. --extensions .csv .tsv). " + "Overrides --file-type when provided." + ), + ) + + # --- Text-file reading parameters --- + text_group = parser.add_argument_group( + "Text-file parameters", + description=( + "Parameters used when reading delimited text files. These are " + "stored for downstream consumers and do not affect the S3 scan " + "itself." + ), + ) + text_group.add_argument( + "--delimiter", + default=None, + help=( + "Field delimiter for text files (default: ',' for .csv/.txt, " + "'\\t' for .tsv). Use 'tab' or '\\t' for a tab character." + ), + ) + text_group.add_argument( + "--encoding", + default=DEFAULT_ENCODING, + help=f"Character encoding for text files (default: {DEFAULT_ENCODING}).", + ) + text_group.add_argument( + "--quotechar", + default=DEFAULT_QUOTECHAR, + help=f"Quote character for text files (default: {DEFAULT_QUOTECHAR!r}).", + ) + + # --- S3 / general settings --- + s3_group = parser.add_argument_group("S3 settings") + s3_group.add_argument( + "--bucket", + default=None, + help=f"S3 bucket name (default: {S3_BUCKET}).", + ) + s3_group.add_argument( + "--profile", + default=None, + help=f"AWS CLI profile name (default: {AWS_PROFILE}).", + ) + s3_group.add_argument( + "--input-file", + default=None, + help=f"Path to the text file with S3 prefixes (default: {INPUT_FILE}).", + ) + + return parser + + +def resolve_extensions(args: argparse.Namespace) -> Set[str]: + """Determine the active extension set from parsed CLI *args*. + + If ``--extensions`` is provided it takes precedence. Otherwise + ``--file-type`` is used to select a predefined set. + """ + if args.extensions: + # Normalise: ensure each extension starts with a dot and is lowercase + exts: Set[str] = set() + for ext in args.extensions: + ext = ext.strip().lower() + if not ext.startswith("."): + ext = "." + ext + exts.add(ext) + return exts + + if args.file_type == "sas": + return SAS_EXTENSIONS + if args.file_type == "text": + return TEXT_EXTENSIONS + return SUPPORTED_EXTENSIONS + + +def resolve_delimiter(args: argparse.Namespace) -> str: + """Return the effective delimiter from parsed CLI *args*. + + Handles the special values ``'tab'`` and ``'\\t'`` so users can specify a + tab character on the command line without shell-escaping issues. + """ + if args.delimiter is None: + return DEFAULT_DELIMITER + raw = args.delimiter + if raw.lower() in ("tab", "\\t"): + return "\t" + return raw + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": - import os + parser = build_arg_parser() + args = parser.parse_args() + + # --- Apply CLI overrides to module-level config --------------------------- + if args.bucket: + S3_BUCKET = args.bucket + if args.profile: + AWS_PROFILE = args.profile + input_file = args.input_file if args.input_file else INPUT_FILE + + active_extensions = resolve_extensions(args) + FILE_EXTENSIONS = active_extensions + + delimiter = resolve_delimiter(args) + encoding = args.encoding + quotechar = args.quotechar # --- Read input file ------------------------------------------------------ - if not os.path.exists(INPUT_FILE): - print(f"ERROR: Input file not found: {INPUT_FILE}", file=sys.stderr) + if not os.path.exists(input_file): + print(f"ERROR: Input file not found: {input_file}", file=sys.stderr) sys.exit(1) try: - prefixes = read_input_file(INPUT_FILE) + prefixes = read_input_file(input_file) except Exception as exc: print(f"ERROR: Could not read input file: {exc}", file=sys.stderr) sys.exit(1) @@ -346,7 +605,17 @@ if __name__ == "__main__": print(f"ERROR: AWS profile validation failed: {exc}", file=sys.stderr) sys.exit(1) + # --- Print active configuration ------------------------------------------- + ext_label = extensions_label(active_extensions) + print(f"Bucket: {S3_BUCKET}", file=sys.stderr) + print(f"Extensions: {ext_label}", file=sys.stderr) + if active_extensions & TEXT_EXTENSIONS: + print( + f"Text opts: delimiter={delimiter!r} encoding={encoding!r} " + f"quotechar={quotechar!r}", + file=sys.stderr, + ) + # --- Explore -------------------------------------------------------------- - print(f"Bucket: {S3_BUCKET}", file=sys.stderr) - results = explore_directories(prefixes) - print_results(results) + results = explore_directories(prefixes, extensions=active_extensions) + print_results(results, extensions=active_extensions) diff --git a/utils/file_viewer.py b/utils/file_viewer.py index 0b3303d..6c6343d 100644 --- a/utils/file_viewer.py +++ b/utils/file_viewer.py @@ -1,15 +1,23 @@ -"""Standalone utility to download a .sas7bdat file from S3 and print a -column-level summary of the first 10 rows. +"""Standalone utility to download a SAS or delimited text file from S3 and +print a column-level summary of the first *N* rows. -Configure the four constants below, then run:: +Supported formats +----------------- +* **SAS** – ``.sas7bdat``, ``.xpt``, ``.xport`` (read via *pyreadstat*) +* **Text** – ``.csv``, ``.tsv``, ``.txt`` (read via *pandas.read_csv*) + +Configure the four constants below **or** use the CLI arguments, then run:: python3 file_viewer.py + python3 file_viewer.py --local path/to/file.csv + python3 file_viewer.py --local path/to/data.tsv --delimiter $'\\t' Python 3.14 compatible. """ from __future__ import annotations +import argparse import os import sys @@ -19,14 +27,28 @@ import pyreadstat # --------------------------------------------------------------------------- -# Configuration — edit these before running +# Supported file extensions +# --------------------------------------------------------------------------- + +SAS_EXTENSIONS: set[str] = {".sas7bdat", ".xpt", ".xport"} +"""File extensions recognised as SAS data files.""" + +TEXT_EXTENSIONS: set[str] = {".txt", ".csv", ".tsv"} +"""File extensions recognised as delimited text files.""" + +SUPPORTED_EXTENSIONS: set[str] = SAS_EXTENSIONS | TEXT_EXTENSIONS +"""Union of all supported file extensions.""" + + +# --------------------------------------------------------------------------- +# Configuration — edit these before running (or use CLI arguments) # --------------------------------------------------------------------------- S3_BUCKET: str = "my-bucket" """S3 bucket name.""" S3_KEY: str = "path/to/file.sas7bdat" -"""Object key (path) within the bucket to the .sas7bdat file.""" +"""Object key (path) within the bucket to a supported data file.""" LOCAL_FOLDER: str = "./downloads" """Local directory to download the file into.""" @@ -45,6 +67,8 @@ def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None: If *local_path* exists and its size matches the S3 object's size, the download is skipped and a message is printed. + + Supports any file whose extension is in :data:`SUPPORTED_EXTENSIONS`. """ session = boto3.Session(profile_name=AWS_PROFILE) s3 = session.client("s3") @@ -69,12 +93,117 @@ def _ensure_local_copy(bucket: str, key: str, local_path: str) -> None: print("Download complete.") +# -- SAS readers ------------------------------------------------------------- + + def _read_sas_head(path: str, row_count: int = 10) -> pd.DataFrame: - """Read the first *row_count* rows of a .sas7bdat file.""" - df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count) + """Read the first *row_count* rows of a SAS file (``.sas7bdat``, ``.xpt``, ``.xport``).""" + ext = os.path.splitext(path)[1].lower() + if ext == ".sas7bdat": + df, _ = pyreadstat.read_sas7bdat(path, row_offset=0, row_limit=row_count) + elif ext in {".xpt", ".xport"}: + df, _ = pyreadstat.read_xport(path, row_offset=0, row_limit=row_count) + else: + raise ValueError(f"Unsupported SAS extension: {ext}") return df +# -- Text readers ------------------------------------------------------------ + + +def _read_text_head( + path: str, + row_count: int = 10, + delimiter: str = ",", + encoding: str = "utf-8", + quotechar: str = '"', +) -> pd.DataFrame: + """Read the first *row_count* rows of a delimited text file. + + Parameters + ---------- + path : str + Path to the ``.csv``, ``.tsv``, or ``.txt`` file. + row_count : int, optional + Number of data rows to read (default ``10``). + delimiter : str, optional + Column delimiter (default ``","``). For ``.tsv`` files the caller + should pass ``"\\t"``. + encoding : str, optional + File encoding (default ``"utf-8"``). + quotechar : str, optional + Character used to quote fields (default ``'"'``). + """ + return pd.read_csv( + path, + sep=delimiter, + encoding=encoding, + quotechar=quotechar, + nrows=row_count, + ) + + +# -- Unified reader ---------------------------------------------------------- + + +def _read_head( + path: str, + row_count: int = 10, + delimiter: str | None = None, + encoding: str = "utf-8", + quotechar: str = '"', +) -> pd.DataFrame: + """Read the first *row_count* rows of a supported data file. + + Auto-detects the file type from its extension and delegates to the + appropriate reader. For ``.tsv`` files the delimiter defaults to tab + (``"\\t"``); for other text files it defaults to ``","``. + + Parameters + ---------- + path : str + Path to the data file. + row_count : int, optional + Number of data rows to read (default ``10``). + delimiter : str or None, optional + Column delimiter for text files. ``None`` means *auto-detect* + (tab for ``.tsv``, comma otherwise). + encoding : str, optional + Encoding for text files (default ``"utf-8"``). + quotechar : str, optional + Quote character for text files (default ``'"'``). + + Returns + ------- + pandas.DataFrame + """ + ext = os.path.splitext(path)[1].lower() + + if ext not in SUPPORTED_EXTENSIONS: + raise ValueError( + f"Unsupported file extension '{ext}'. " + f"Supported extensions: {sorted(SUPPORTED_EXTENSIONS)}" + ) + + if ext in SAS_EXTENSIONS: + return _read_sas_head(path, row_count=row_count) + + # --- Text file path --- + if delimiter is None: + delimiter = "\t" if ext == ".tsv" else "," + + return _read_text_head( + path, + row_count=row_count, + delimiter=delimiter, + encoding=encoding, + quotechar=quotechar, + ) + + +# -- Display ----------------------------------------------------------------- + + def _sample_values(series: pd.Series, n: int = 3) -> str: """Return up to *n* non-null sample values as a comma-separated string.""" non_null = series.dropna() @@ -114,26 +243,126 @@ def _print_summary(df: pd.DataFrame) -> None: print() +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _build_parser() -> argparse.ArgumentParser: + """Build the argument parser for the file-viewer CLI.""" + parser = argparse.ArgumentParser( + description=( + "Download a SAS or delimited text file from S3 (or read a local " + "file) and print a column-level summary of the first N rows.\n\n" + "Supported extensions: " + + ", ".join(sorted(SUPPORTED_EXTENSIONS)) + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + source = parser.add_mutually_exclusive_group() + source.add_argument( + "--local", + metavar="FILE", + default=None, + help=( + "Path to a local data file to summarise (skips S3 download). " + "Supported extensions: " + + ", ".join(sorted(SUPPORTED_EXTENSIONS)) + ), + ) + source.add_argument( + "--s3-key", + metavar="KEY", + default=None, + help="Override the S3_KEY constant with this object key.", + ) + + parser.add_argument( + "--rows", + type=int, + default=10, + metavar="N", + help="Number of rows to read (default: 10).", + ) + + # Text-file-specific options + text_group = parser.add_argument_group( + "text file options", + "These options apply only to .csv / .tsv / .txt files.", + ) + text_group.add_argument( + "--delimiter", + default=None, + help=( + 'Column delimiter for text files (default: "," for .csv/.txt, ' + '"\\t" for .tsv). Use $\'\\t\' in the shell for a literal tab.' + ), + ) + text_group.add_argument( + "--encoding", + default="utf-8", + help='File encoding for text files (default: "utf-8").', + ) + text_group.add_argument( + "--quotechar", + default='"', + help='Quote character for text files (default: \'"\').', + ) + + return parser + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": - # --- Download ----------------------------------------------------------- - os.makedirs(LOCAL_FOLDER, exist_ok=True) - local_filename = os.path.basename(S3_KEY) - local_path = os.path.join(LOCAL_FOLDER, local_filename) + parser = _build_parser() + args = parser.parse_args() - try: - _ensure_local_copy(S3_BUCKET, S3_KEY, local_path) - except Exception as exc: - print(f"S3 download error: {exc}", file=sys.stderr) - sys.exit(1) + if args.local: + # ---- Local file mode ----------------------------------------------- + local_path = args.local + ext = os.path.splitext(local_path)[1].lower() + if ext not in SUPPORTED_EXTENSIONS: + parser.error( + f"Unsupported file extension '{ext}'. " + f"Supported: {sorted(SUPPORTED_EXTENSIONS)}" + ) + if not os.path.isfile(local_path): + print(f"File not found: {local_path}", file=sys.stderr) + sys.exit(1) + else: + # ---- S3 download mode ---------------------------------------------- + s3_key = args.s3_key or S3_KEY + ext = os.path.splitext(s3_key)[1].lower() + if ext not in SUPPORTED_EXTENSIONS: + parser.error( + f"Unsupported file extension '{ext}' in S3 key. " + f"Supported: {sorted(SUPPORTED_EXTENSIONS)}" + ) - # --- Read & summarize --------------------------------------------------- + os.makedirs(LOCAL_FOLDER, exist_ok=True) + local_filename = os.path.basename(s3_key) + local_path = os.path.join(LOCAL_FOLDER, local_filename) + + try: + _ensure_local_copy(S3_BUCKET, s3_key, local_path) + except Exception as exc: + print(f"S3 download error: {exc}", file=sys.stderr) + sys.exit(1) + + # ---- Read & summarise -------------------------------------------------- try: - df = _read_sas_head(local_path, row_count=10) + df = _read_head( + local_path, + row_count=args.rows, + delimiter=args.delimiter, + encoding=args.encoding, + quotechar=args.quotechar, + ) except Exception as exc: print(f"File read error: {exc}", file=sys.stderr) sys.exit(2) diff --git a/utils/s3_download.py b/utils/s3_download.py index c987db5..d1a2f2e 100644 --- a/utils/s3_download.py +++ b/utils/s3_download.py @@ -5,6 +5,10 @@ under that prefix recursively, groups objects into *clusters* using the same explicit-pattern + auto-detect rules as ``load_folder.py``, and downloads each cluster's files into its own subfolder under a local destination root. +Supported file types: + * SAS data files: ``.sas7bdat``, ``.xpt``, ``.xport`` + * Delimited text files: ``.txt``, ``.csv``, ``.tsv`` + ------------------------------------------------------------------------------- USAGE ------------------------------------------------------------------------------- @@ -19,8 +23,9 @@ USAGE aws_profile: default # optional; default boto3 chain if omitted auto_detect: true # optional; default true - extensions: # optional; default sas7bdat/xpt/xport + extensions: # optional; default sas7bdat/xpt/xport/txt/csv/tsv - .sas7bdat + - .csv on_exists: skip # optional; skip | overwrite | error concurrency: 4 # optional; default 4 @@ -58,7 +63,8 @@ Exit codes: * Listing is recursive (no S3 ``Delimiter``). Regexes are matched against the *basename* of each key (the part after the last ``/``), so a nested object like ``census/2020/raw/nested/group_c1.sas7bdat`` is grouped by - ``group_c1.sas7bdat`` alone. + ``group_c1.sas7bdat`` alone. Text files (e.g. ``data.csv``) are handled + identically — the basename is extracted and matched the same way. * Explicit patterns are tried in order. A key matched by one pattern is removed from the pool before the next pattern runs. Overlap between patterns is flagged as an error at discovery time. @@ -97,7 +103,9 @@ import boto3 import yaml -DEFAULT_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport") +SAS_EXTENSIONS: Tuple[str, ...] = (".sas7bdat", ".xpt", ".xport") +TEXT_EXTENSIONS: Tuple[str, ...] = (".txt", ".csv", ".tsv") +DEFAULT_EXTENSIONS: Tuple[str, ...] = SAS_EXTENSIONS + TEXT_EXTENSIONS VALID_ON_EXISTS: Tuple[str, ...] = ("skip", "overwrite", "error") DEFAULT_CONCURRENCY: int = 4 @@ -318,7 +326,12 @@ def build_s3_client(cfg: DownloadConfig): def list_s3_objects(s3_client, cfg: DownloadConfig) -> List[S3Object]: - """List all objects under ``cfg.prefix`` recursively, filtered by extension.""" + """List all objects under ``cfg.prefix`` recursively, filtered by extension. + + Supports SAS extensions (``.sas7bdat``, ``.xpt``, ``.xport``) and text + extensions (``.txt``, ``.csv``, ``.tsv``) — whichever are present in + ``cfg.extensions``. + """ paginator = s3_client.get_paginator("list_objects_v2") out: List[S3Object] = [] for page in paginator.paginate(Bucket=cfg.bucket, Prefix=cfg.prefix): @@ -584,8 +597,12 @@ def download_cluster( def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description=( - "Download S3 objects under a prefix into a local folder, " - "grouping objects into clusters that each become one subfolder." + "Download S3 objects (SAS data files and/or delimited text files) " + "under a prefix into a local folder, grouping objects into " + "clusters that each become one subfolder. " + "Supported extensions: " + + ", ".join(DEFAULT_EXTENSIONS) + + "." ), ) p.add_argument( diff --git a/utils/sample_s3_download_config.yaml b/utils/sample_s3_download_config.yaml index 4ae715e..c48237a 100644 --- a/utils/sample_s3_download_config.yaml +++ b/utils/sample_s3_download_config.yaml @@ -52,11 +52,13 @@ local_folder: ./downloads auto_detect: true # Object extensions to consider. Anything else under the prefix is ignored. -# Default (when this key is omitted): .sas7bdat, .xpt, .xport (matches -# generic_loader/load_folder.py). +# Default (when this key is omitted): .sas7bdat, .xpt, .xport, .txt, .csv, .tsv # extensions: # - .sas7bdat # - .xpt +# - .txt +# - .csv +# - .tsv # --------------------------------------------------------------------------- # Optional: download behavior @@ -103,3 +105,7 @@ clusters: # # - pattern: '^year2020_regionA_\d+_detail\.sas7bdat$' # name: year2020_regionA_detail + + # Text file cluster example (when file_type: text): + # - pattern: '^data_group_a\d+\.txt$' + # name: data_group_a diff --git a/utils/sas_profiler.py b/utils/sas_profiler.py index 3adc4d6..f676421 100644 --- a/utils/sas_profiler.py +++ b/utils/sas_profiler.py @@ -1,6 +1,6 @@ -"""Standalone utility that profiles a single local SAS file and writes an -Excel report with drop, partition, and index candidates plus type-inference -warnings. +"""Standalone utility that profiles a single local SAS or delimited text file +and writes an Excel report with drop, partition, and index candidates plus +type-inference warnings. Configure the constants below and run:: @@ -12,14 +12,30 @@ Or override any of them from the command line:: --file ./data/mystate.sas7bdat \ --out ./reports/mystate_profile.xlsx + # Profile a CSV file with default comma delimiter: + python3 utils/sas_profiler.py --file ./data/myfile.csv + + # Profile a TSV file (tab delimiter auto-detected from extension): + python3 utils/sas_profiler.py --file ./data/myfile.tsv + + # Profile a pipe-delimited .txt file: + python3 utils/sas_profiler.py --file ./data/myfile.txt --delimiter '|' + The report is a paste-ready companion to ``generic_loader/load_sas.py`` and ``generic_loader/load_folder.py``: the "inferred Postgres type" column uses the loader's own ``infer_schema`` so the drop / partition / index suggestions map one-to-one onto valid YAML config entries for those scripts. -Supported inputs: ``.sas7bdat`` / ``.xpt`` / ``.xport`` (whatever the loader -can read). +Supported inputs: + +- SAS files: ``.sas7bdat`` / ``.xpt`` / ``.xport`` +- Delimited text files: ``.csv`` / ``.tsv`` / ``.txt`` (with headers) + +For text files, SAS-specific metadata (formats, labels) is not available; +those fields show "N/A" in the report. All other profiling (column names, +data types from pandas, value distributions, null counts, etc.) works +identically. Python 3.10+ compatible. """ @@ -58,12 +74,50 @@ from load_sas import ( # noqa: E402 ) +# --------------------------------------------------------------------------- +# File extension constants +# --------------------------------------------------------------------------- + +TEXT_EXTENSIONS = {".txt", ".csv", ".tsv"} +"""File extensions recognised as delimited text files.""" + +SAS_EXTENSIONS = {".sas7bdat", ".xpt", ".xport"} +"""File extensions recognised as SAS data files.""" + +SUPPORTED_EXTENSIONS = SAS_EXTENSIONS | TEXT_EXTENSIONS +"""All file extensions the profiler can handle.""" + + +def _is_text_file(path: Path) -> bool: + """Return True if ``path`` has a recognised delimited-text extension.""" + return path.suffix.lower() in TEXT_EXTENSIONS + + +def _is_supported_file(path: Path) -> bool: + """Return True if ``path`` has any supported extension.""" + return path.suffix.lower() in SUPPORTED_EXTENSIONS + + +def _auto_delimiter(path: Path, explicit: Optional[str]) -> str: + """Return the effective delimiter for *path*. + + If the caller supplied an explicit delimiter, use it. Otherwise default + to ``"\\t"`` for ``.tsv`` files and ``","`` for everything else. + """ + if explicit is not None: + return explicit + if path.suffix.lower() == ".tsv": + return "\t" + return "," + + # --------------------------------------------------------------------------- # Configuration - edit these before running, or override via CLI flags # --------------------------------------------------------------------------- SAS_PATH: str = "./generic_loader/samples/sample_kitchensink.xpt" -"""Local path to the .sas7bdat / .xpt / .xport file to profile.""" +"""Local path to the file to profile. Accepts ``.sas7bdat``, ``.xpt``, +``.xport``, ``.csv``, ``.tsv``, or ``.txt``.""" OUTPUT_XLSX: str = "./sas_profile.xlsx" """Where to write the Excel report.""" @@ -403,14 +457,38 @@ def profile_file( path: Path, *, chunksize: Optional[int] = None, -) -> Tuple[Dict[str, _ColumnStats], Dict[str, ColumnSpec], Any, int]: - """Stream ``path`` once, returning (stats, columns, meta, total_rows). + delimiter: Optional[str] = None, + encoding: str = "utf-8", + quotechar: str = '"', +) -> Tuple[Dict[str, _ColumnStats], Dict[str, ColumnSpec], Any, int, bool]: + """Stream ``path`` once, returning *(stats, columns, meta, total_rows, is_text)*. ``columns`` is the loader's inferred schema from the first ``PREVIEW_ROWS_FOR_INFERENCE`` rows - identical to what ``load_sas`` - would use. ``stats`` are the full-file observations we add on top. + would use. ``stats`` are the full-file observations we add on top. + + ``is_text`` is True when the file was read as a delimited text file + rather than a SAS file. Callers can use this to adjust display (e.g. + showing "N/A" for SAS-specific metadata fields). + + For text files (``.csv``, ``.tsv``, ``.txt``), the ``delimiter``, + ``encoding``, and ``quotechar`` parameters control parsing. If + ``delimiter`` is ``None``, ``.tsv`` files default to tab and all + others default to comma. """ - preview_df, meta = read_sas_preview(path, rows=PREVIEW_ROWS_FOR_INFERENCE) + is_text = _is_text_file(path) + effective_delimiter = _auto_delimiter(path, delimiter) + + # Build kwargs for the loader's text-aware read functions. + text_kwargs: Dict[str, Any] = { + "delimiter": effective_delimiter, + "text_encoding": encoding, + "quotechar": quotechar, + } + + preview_df, meta = read_sas_preview( + path, rows=PREVIEW_ROWS_FOR_INFERENCE, **text_kwargs, + ) total_rows_hint = getattr(meta, "number_rows", None) columns = infer_schema(preview_df, meta, total_rows=total_rows_hint) @@ -420,14 +498,15 @@ def profile_file( total_rows = 0 effective_chunksize = chunksize if chunksize is not None else PROFILE_CHUNK_ROWS - kwargs = {"chunksize": effective_chunksize} + chunk_kwargs: Dict[str, Any] = {"chunksize": effective_chunksize} + chunk_kwargs.update(text_kwargs) # pyreadstat + pandas are both C-level; the per-chunk overhead we pay # is dominated by the value_counts passes in _ColumnStats.update, so # the profile runs O(total_rows) with a small constant. import time started_at = time.monotonic() last_print_at = started_at - for chunk_df, _chunk_meta in iter_sas_chunks(path, **kwargs): + for chunk_df, _chunk_meta in iter_sas_chunks(path, **chunk_kwargs): total_rows += len(chunk_df) for name, cs in stats.items(): if name not in chunk_df.columns: @@ -454,7 +533,7 @@ def profile_file( file=sys.stderr, ) - return stats, columns, meta, total_rows + return stats, columns, meta, total_rows, is_text # --------------------------------------------------------------------------- @@ -876,6 +955,8 @@ def _write_columns( ws, stats: Dict[str, _ColumnStats], columns: Dict[str, ColumnSpec], + *, + is_text: bool = False, ) -> None: headers = [ "column", "sas_format", "source_dtype", "inferred_postgres_type", @@ -891,9 +972,14 @@ def _write_columns( spec = columns.get(name) top_val, top_count = cs.top_value mean_bytes = (cs.str_sum_bytes / cs.str_count) if cs.str_count else None + # For text files, SAS-specific metadata is not available. + if is_text: + sas_format_display = "N/A" + else: + sas_format_display = spec.sas_format if spec else "" values = [ name, - spec.sas_format if spec else "", + sas_format_display, spec.source_dtype if spec else "", spec.postgres_type if spec else "", "YES" if (spec and spec.nullable) else "NO", @@ -1018,6 +1104,7 @@ def write_report( warnings: List[_TypeWarning], yaml_snippet: str, thresholds: Dict[str, Any], + is_text: bool = False, ) -> None: wb = Workbook() ws = wb.active @@ -1030,7 +1117,7 @@ def write_report( total_cols=len(columns), thresholds=thresholds, ) - _write_columns(wb.create_sheet("Columns"), stats, columns) + _write_columns(wb.create_sheet("Columns"), stats, columns, is_text=is_text) _write_drop(wb.create_sheet("Drop candidates"), drops) _write_partition(wb.create_sheet("Partition candidates"), partitions) _write_index(wb.create_sheet("Index candidates"), indexes) @@ -1047,13 +1134,22 @@ def write_report( def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description=( - "Profile a local SAS file (.sas7bdat / .xpt / .xport) and write " - "an Excel report with drop, partition_by, and index suggestions " - "for generic_loader/load_sas.py and load_folder.py." + "Profile a local SAS or delimited text file and write an Excel " + "report with drop, partition_by, and index suggestions for " + "generic_loader/load_sas.py and load_folder.py.\n\n" + "Supported formats: .sas7bdat, .xpt, .xport (SAS); " + ".csv, .tsv, .txt (delimited text with headers)." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.add_argument( + "--file", type=Path, default=Path(SAS_PATH), + help=( + "Path to the file to profile. Accepts SAS files " + "(.sas7bdat/.xpt/.xport) and delimited text files " + f"(.csv/.tsv/.txt). Default: {SAS_PATH!r}." ), ) - p.add_argument("--file", type=Path, default=Path(SAS_PATH), - help=f"Path to the SAS file to profile (default: {SAS_PATH!r}).") p.add_argument("--out", type=Path, default=Path(OUTPUT_XLSX), help=f"Where to write the .xlsx report (default: {OUTPUT_XLSX!r}).") p.add_argument("--high-null-pct", type=float, default=HIGH_NULL_PCT, @@ -1070,6 +1166,29 @@ def _build_argparser() -> argparse.ArgumentParser: f"memory. Defaults to PROFILE_CHUNK_ROWS ({PROFILE_CHUNK_ROWS:,})." ), ) + # -- Text file options ------------------------------------------------- + p.add_argument( + "--delimiter", type=str, default=None, + help=( + "Column delimiter for text files (.csv/.tsv/.txt). " + "Defaults to tab for .tsv, comma for .csv/.txt. " + "Ignored for SAS files. Example: --delimiter '|'" + ), + ) + p.add_argument( + "--encoding", type=str, default="utf-8", + help=( + "Character encoding for text files (default: utf-8). " + "Ignored for SAS files." + ), + ) + p.add_argument( + "--quotechar", type=str, default='"', + help=( + 'Quote character for text files (default: \'"\'). ' + "Ignored for SAS files." + ), + ) return p @@ -1080,11 +1199,27 @@ def main(argv: Optional[List[str]] = None) -> int: out_path: Path = args.out if not path.exists(): - print(f"error: SAS file not found: {path}", file=sys.stderr) + print(f"error: file not found: {path}", file=sys.stderr) return 2 - print(f"profiling {path} -> {out_path}", file=sys.stderr) - stats, columns, meta, total_rows = profile_file(path, chunksize=args.chunksize) + if not _is_supported_file(path): + exts = ", ".join(sorted(SUPPORTED_EXTENSIONS)) + print( + f"error: unsupported extension {path.suffix!r}; " + f"expected one of: {exts}", + file=sys.stderr, + ) + return 2 + + file_kind = "text" if _is_text_file(path) else "SAS" + print(f"profiling {path} ({file_kind}) -> {out_path}", file=sys.stderr) + stats, columns, meta, total_rows, is_text = profile_file( + path, + chunksize=args.chunksize, + delimiter=args.delimiter, + encoding=args.encoding, + quotechar=args.quotechar, + ) drops, partitions, indexes, warnings = classify( stats, columns, @@ -1121,6 +1256,7 @@ def main(argv: Optional[List[str]] = None) -> int: warnings=warnings, yaml_snippet=yaml_snippet, thresholds=thresholds, + is_text=is_text, ) print(