batch_folder_processing #1
1
generic_loader/.gitignore
vendored
1
generic_loader/.gitignore
vendored
@ -2,3 +2,4 @@
|
|||||||
/samples
|
/samples
|
||||||
/.env
|
/.env
|
||||||
/__pycache__
|
/__pycache__
|
||||||
|
/venv
|
||||||
@ -95,6 +95,7 @@ Exit codes:
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import getpass
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
@ -463,6 +464,14 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
"cluster back and continue with the next one."
|
"cluster back and continue with the next one."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--dbcreds",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Prompt for database username and password instead of reading "
|
||||||
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
|
),
|
||||||
|
)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
@ -512,7 +521,12 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
print()
|
print()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
conn = connect()
|
db_user = db_password = None
|
||||||
|
if args.dbcreds:
|
||||||
|
db_user = input("Database username: ")
|
||||||
|
db_password = getpass.getpass("Database password: ")
|
||||||
|
|
||||||
|
conn = connect(user=db_user, password=db_password)
|
||||||
conn.autocommit = False
|
conn.autocommit = False
|
||||||
failures: List[Tuple[str, Exception]] = []
|
failures: List[Tuple[str, Exception]] = []
|
||||||
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
|
totals: List[Tuple[str, int, int]] = [] # (tablename, files, rows)
|
||||||
|
|||||||
@ -207,6 +207,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import getpass
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
@ -308,18 +309,25 @@ class ValidationError(RuntimeError):
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def connect() -> psycopg2.extensions.connection:
|
def connect(
|
||||||
|
*,
|
||||||
|
user: Optional[str] = None,
|
||||||
|
password: Optional[str] = None,
|
||||||
|
) -> psycopg2.extensions.connection:
|
||||||
"""Open a psycopg2 connection using standard libpq env vars.
|
"""Open a psycopg2 connection using standard libpq env vars.
|
||||||
|
|
||||||
Assumes `.env` has already been loaded (the CLI does this before calling).
|
Assumes `.env` has already been loaded (the CLI does this before calling).
|
||||||
Orchestrators that wrap this module should either call ``load_dotenv()``
|
Orchestrators that wrap this module should either call ``load_dotenv()``
|
||||||
themselves or ensure the env vars are set.
|
themselves or ensure the env vars are set.
|
||||||
|
|
||||||
|
``user`` and ``password`` override the corresponding env vars when supplied
|
||||||
|
(used by the ``--dbcreds`` CLI flag to accept interactive input).
|
||||||
"""
|
"""
|
||||||
conn = psycopg2.connect(
|
conn = psycopg2.connect(
|
||||||
host=os.environ.get("PGHOST"),
|
host=os.environ.get("PGHOST"),
|
||||||
port=os.environ.get("PGPORT"),
|
port=os.environ.get("PGPORT"),
|
||||||
user=os.environ.get("PGUSER"),
|
user=user or os.environ.get("PGUSER"),
|
||||||
password=os.environ.get("PGPASSWORD"),
|
password=password or os.environ.get("PGPASSWORD"),
|
||||||
dbname=os.environ.get("PGDATABASE"),
|
dbname=os.environ.get("PGDATABASE"),
|
||||||
)
|
)
|
||||||
return conn
|
return conn
|
||||||
@ -1150,6 +1158,14 @@ def _build_argparser() -> argparse.ArgumentParser:
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Print inferred CREATE TABLE and stop; don't touch Postgres.",
|
help="Print inferred CREATE TABLE and stop; don't touch Postgres.",
|
||||||
)
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--dbcreds",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Prompt for database username and password instead of reading "
|
||||||
|
"PGUSER / PGPASSWORD from the environment or .env file."
|
||||||
|
),
|
||||||
|
)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
@ -1208,7 +1224,12 @@ def main(argv: Optional[List[str]] = None) -> int:
|
|||||||
print(f" streaming... {seen:,} rows", file=sys.stderr)
|
print(f" streaming... {seen:,} rows", file=sys.stderr)
|
||||||
yield chunk_df
|
yield chunk_df
|
||||||
|
|
||||||
conn = connect()
|
db_user = db_password = None
|
||||||
|
if args.dbcreds:
|
||||||
|
db_user = input("Database username: ")
|
||||||
|
db_password = getpass.getpass("Database password: ")
|
||||||
|
|
||||||
|
conn = connect(user=db_user, password=db_password)
|
||||||
conn.autocommit = False
|
conn.autocommit = False
|
||||||
try:
|
try:
|
||||||
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
create_table(conn, cfg.schemaname, cfg.tablename, columns, cfg.if_exists)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user