From 1b9d9c1d0165b076e43a8a76294ecc53a9df49fc Mon Sep 17 00:00:00 2001 From: Caleb Grant Date: Thu, 18 Jul 2024 16:15:50 -0700 Subject: [PATCH] start refactoring --- .github/workflows/ci-cd.yml | 25 + .gitignore | 1 + Makefile | 22 +- docs/.nojekyll | 0 docs/conf.py | 50 + docs/index.rst | 31 + docs/make.bat | 35 + docs/modules.rst | 7 + docs/pg_upsert.rst | 21 + pg_upsert/__init__.py | 5 +- pg_upsert/_version.py | 1 + pg_upsert/pg_upsert.py | 3130 ++++++++++++++--------------------- pyproject.toml | 4 +- requirements.txt | 39 + tests/data.sql | 6 +- 15 files changed, 1513 insertions(+), 1864 deletions(-) create mode 100644 docs/.nojekyll create mode 100644 docs/conf.py create mode 100644 docs/index.rst create mode 100644 docs/make.bat create mode 100644 docs/modules.rst create mode 100644 docs/pg_upsert.rst create mode 100644 pg_upsert/_version.py diff --git a/.github/workflows/ci-cd.yml b/.github/workflows/ci-cd.yml index 83dfaa5..524d378 100644 --- a/.github/workflows/ci-cd.yml +++ b/.github/workflows/ci-cd.yml @@ -76,6 +76,31 @@ jobs: platforms: linux/amd64,linux/arm64 + docs-build-and-deploy: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.11' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install sphinx sphinx-book-theme + - name: Build Sphinx documentation + run: | + cd docs + make html + - name: Deploy to GitHub Pages + uses: peaceiris/actions-gh-pages@v3 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: ./docs/_build/html + + pypi-publish: name: PyPI Publish runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 5514699..f17aeee 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ dist/ *.egg-info build/ +docs/_build/ diff --git a/Makefile b/Makefile index 71954b3..aad9b5b 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,13 @@ PYTHON = $(BIN)/python PIP = $(BIN)/pip TEST = pytest +# Sphinx documentation +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SPHINXAPIDOC ?= sphinx-apidoc +SOURCEDIR = docs +BUILDDIR = docs/_build + # Self documenting commands .DEFAULT_GOAL := help .PHONY: help @@ -61,12 +68,23 @@ lint: $(VENV)/bin/activate ## Run pre-commit hooks test: $(VENV)/bin/activate ## Run unit tests $(PYTHON) -m $(TEST) -build: $(VENV)/bin/activate ## Generate distrubition packages +build-dist: $(VENV)/bin/activate ## Generate distrubition packages $(PYTHON) -m build +build-docs: ## Generate documentation + @printf "Building documentation\n" + @$(SPHINXAPIDOC) -f -o "$(SOURCEDIR)" pg_upsert + @$(SPHINXBUILD) -M html "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) + publish: $(VENV)/bin/activate ## Publish to PyPI $(MAKE) lint $(MAKE) test - $(MAKE) build + $(MAKE) build-dist $(PYTHON) -m twine upload --repository pypi dist/* $(MAKE) clean + +build: $(VENV)/bin/activate ## Build the project + $(MAKE) lint + $(MAKE) test + $(MAKE) build-dist + $(MAKE) build-docs diff --git a/docs/.nojekyll b/docs/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..356bb67 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,50 @@ +# Configuration file for the Sphinx documentation builder. + +import os +import sys + +sys.path.insert(0, os.path.abspath("..")) + +import pg_upsert + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +project = "pg_upsert" +copyright = "2024, Caleb Grant" +author = "Caleb Grant" + +# The short X.Y version. +version = pg_upsert.__version__ +# The full version, including alpha/beta/rc tags. +release = pg_upsert.__version__ + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = [ + "sphinx.ext.doctest", + "sphinx.ext.autodoc", + "sphinx.ext.autosummary", + "sphinx.ext.napoleon", + "sphinx.ext.viewcode", +] + +templates_path = ["_templates"] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +add_module_names = True + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = "sphinx_book_theme" + +html_static_path = ["_static"] + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +html_show_sphinx = False diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..9d423b3 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,31 @@ +.. pg_upsert documentation master file, created by + sphinx-quickstart on Thu Jul 18 11:16:22 2024. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +pg_upsert documentation +======================= + +Release v\ |version|. + +**pg_upsert** is a Python package that runs not-NULL, Primary Key, Foreign Key, and Check Constraint checks on PostgreSQL staging tables then update and insert (upsert) data from staging tables to base tables. + +The API Documentation / Guide +----------------------------- + +If you are looking for information on a specific function, class, or method, +this part of the documentation is for you. + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + modules + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..747ffb7 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=source +set BUILDDIR=build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/modules.rst b/docs/modules.rst new file mode 100644 index 0000000..fee14af --- /dev/null +++ b/docs/modules.rst @@ -0,0 +1,7 @@ +pg_upsert +========= + +.. toctree:: + :maxdepth: 4 + + pg_upsert diff --git a/docs/pg_upsert.rst b/docs/pg_upsert.rst new file mode 100644 index 0000000..3f77a58 --- /dev/null +++ b/docs/pg_upsert.rst @@ -0,0 +1,21 @@ +pg\_upsert package +================== + +Submodules +---------- + +pg\_upsert.pg\_upsert module +---------------------------- + +.. automodule:: pg_upsert.pg_upsert + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: pg_upsert + :members: + :undoc-members: + :show-inheritance: diff --git a/pg_upsert/__init__.py b/pg_upsert/__init__.py index 1365b43..c6f4e49 100644 --- a/pg_upsert/__init__.py +++ b/pg_upsert/__init__.py @@ -1,3 +1,4 @@ -from .pg_upsert import upsert +from ._version import __version__ +from .pg_upsert import PgUpsert, PostgresDB -__all__ = ["upsert"] +__all__ = ["PgUpsert", "PostgresDB"] diff --git a/pg_upsert/_version.py b/pg_upsert/_version.py new file mode 100644 index 0000000..c72e379 --- /dev/null +++ b/pg_upsert/_version.py @@ -0,0 +1 @@ +__version__ = "1.1.4" diff --git a/pg_upsert/pg_upsert.py b/pg_upsert/pg_upsert.py index 4c0e6ef..c120591 100644 --- a/pg_upsert/pg_upsert.py +++ b/pg_upsert/pg_upsert.py @@ -13,31 +13,24 @@ from datetime import datetime from pathlib import Path -import polars as pl import psycopg2 from psycopg2.extras import DictCursor from psycopg2.sql import SQL, Composable, Identifier, Literal from tabulate import tabulate -__version__ = "1.1.4" - -description_long = """ -Check data in a staging table or set of staging tables, then update and insert (upsert) -rows of a base table or base tables from the staging table(s) of the same name. -Initial table checks include not-null, primary key, and foreign key checks. -If any of these checks fail, the program will exit with an error message. -If all checks pass, the program will display the number of rows to be inserted -and updated, and ask for confirmation before proceeding. If the user confirms, the -program will perform the upserts and display the number of rows inserted and updated. -If the user does not confirm, the program will exit without performing any upserts. -""" - -description_short = ( - "Update and insert (upsert) data from staging tables to base tables." -) +from ._version import __version__ + +description = "Run not-NULL, Primary Key, Foreign Key, and Check Constraint checks on staging tables then update and insert (upsert) data from staging tables to base tables." +logging.basicConfig( + level=logging.INFO, + format="%(message)s", + handlers=[logging.NullHandler()], +) logger = logging.getLogger(__name__) +# Get the __version__ from the __init__.py file. + class PostgresDB: """Base database object.""" @@ -47,33 +40,50 @@ def __init__( host: str, database: str, user: str, - **kwargs, + port: int = 5432, + passwd: None | str = None, ) -> None: self.host = host + self.port = port self.database = database self.user = user - if ("passwd" in kwargs and kwargs["passwd"] is not None) or ( - "password" in kwargs and kwargs["password"] is not None - ): - self.passwd = kwargs["passwd"] + if passwd is not None: + self.passwd = passwd else: self.passwd = self.get_password() - self.port = 5432 self.in_transaction = False self.encoding = "UTF8" self.conn = None + if not self.valid_connection(): + raise psycopg2.Error(f"Error connecting to {self!s}") def __repr__(self: PostgresDB) -> str: - return f"{self.__class__.__name__}(host={self.host}, database={self.database}, user={self.user})" # noqa: E501 + return ( + f"{self.__class__.__name__}(host={self.host}, port={self.port}, database={self.database}, user={self.user})" # noqa: E501 + ) def __del__(self: PostgresDB) -> None: """Delete the instance.""" self.close() def get_password(self): - return getpass.getpass( - f"The script {Path(__file__).name} wants the password for {self!s}: ", - ) + try: + return getpass.getpass( + f"The script {Path(__file__).name} wants the password for {self!s}: ", + ) + except (KeyboardInterrupt, EOFError) as err: + raise err + + def valid_connection(self: PostgresDB) -> bool: + """Test the database connection.""" + logger.debug(f"Testing connection to {self!s}") + try: + self.open_db() + return True + except psycopg2.Error: + return False + finally: + self.close() def open_db(self: PostgresDB) -> None: """Open a database connection.""" @@ -126,11 +136,14 @@ def execute(self: PostgresDB, sql: str | Composable, params=None): try: curs = self.cursor() if isinstance(sql, Composable): + logger.debug(f"\n{sql.as_string(curs)}") curs.execute(sql) else: if params is None: + logger.debug(f"\n{sql}") curs.execute(sql.encode(self.encoding)) else: + logger.debug(f"\nSQL:\n{sql}\nParameters:\n{params}") curs.execute(sql.encode(self.encoding), params) except Exception: self.rollback() @@ -150,14 +163,7 @@ def dict_row(): row = curs.fetchone() if row: if self.encoding: - r = [ - ( - c.decode(self.encoding, "backslashreplace") - if isinstance(c, bytes) - else c - ) - for c in row - ] + r = [(c.decode(self.encoding, "backslashreplace") if isinstance(c, bytes) else c) for c in row] else: r = row return dict(zip(headers, r, strict=True)) @@ -165,16 +171,6 @@ def dict_row(): return (iter(dict_row, None), headers, curs.rowcount) - def dataframe( - self: PostgresDB, - sql: str | Composable, - params=None, - **kwargs, - ) -> pl.DataFrame: - """Return query results as a Polars dataframe object.""" - data, cols, rowcount = self.rowdict(sql, params) - return pl.DataFrame(data, infer_schema_length=rowcount, **kwargs) - class CompareUI: def __init__( @@ -545,11 +541,7 @@ def treeview_table( datawidthtbl = [ [ len( - ( - rowset[i][j] - if isinstance(rowset[i][j], str) - else str(rowset[i][j]) - ), + (rowset[i][j] if isinstance(rowset[i][j], str) else str(rowset[i][j])), ) for i in nrows ] @@ -645,1970 +637,1398 @@ def fill_tv_table(tvtable: ttk.Treeview, rowset: list | tuple, status_label=None status_label.config(text=" %d rows" % len(rowset)) -def validate_schemas(base_schema: str, stg_schema: str): - """Validate the base and staging schemas.""" - sql = SQL( - """ - drop table if exists ups_ctrl_invl_schema cascade; - select - string_agg(schemas.schema_name - || ' (' - || schema_type - || ')', '; ' order by schema_type - ) as schema_string - into temporary table ups_ctrl_invl_schema - from - ( - select - {base_schema} as schema_name, - 'base' as schema_type - union - select +# def error_handler(errors: list[str]): +# """Log errors and exit.""" +# for error in errors: +# logger.error(error) +# if errors: +# db.rollback() +# sys.exit(1) - {stg_schema} as schema_name, - 'staging' as schema_type - ) as schemas - left join information_schema.schemata as iss - on schemas.schema_name=iss.schema_name - where - iss.schema_name is null - having count(*)>0; - - """, - ).format( - base_schema=Literal(base_schema), - stg_schema=Literal(stg_schema), - ) - if db.execute(sql).rowcount > 0: - errors.append( - "Invalid schema(s) specified: {}".format( - db.dataframe( - SQL( - "select schema_string from ups_ctrl_invl_schema", - ), - )["schema_string"][0], - ), - ) - error_handler(errors) +def ellapsed_time(start_time: datetime): + """Returns a string representing the ellapsed time since the start time.""" + dt = (datetime.now() - start_time).total_seconds() + if dt < 60: + return f"{round((datetime.now() - start_time).total_seconds(), 3)} seconds" + if dt < 3600: + return f"{int(dt // 60)} minutes, {round(dt % 60, 3)} seconds" + return f"{int(dt // 3600)} hours, {int((dt % 3600)) // 60} minutes, {round(dt % 60, 3)} seconds" # noqa: UP034 -def validate_table(base_schema: str, stg_schema: str, table: str): - """Utility script to validate one table in both base and staging schema. - Halts script processing if any either of the schemas are non-existent, - or if either of the tables are not present within those schemas pass. - """ - validate_schemas(base_schema, stg_schema) - sql = SQL( - """ - drop table if exists ups_invl_table cascade; - select string_agg( - tt.schema_name || '.' || tt.table_name || ' (' || tt.schema_type || ')', - '; ' - order by tt.schema_name, - tt.table_name - ) as schema_table into temporary table ups_invl_table - from ( - select {base_schema} as schema_name, - 'base' as schema_type, - {table} as table_name - union - select {stg_schema} as schema_name, - 'staging' as schema_type, - {table} as table_name - ) as tt - left join information_schema.tables as iss - on tt.schema_name = iss.table_schema - and tt.table_name = iss.table_name - where iss.table_name is null - having count(*) > 0; - """, - ).format( - base_schema=Literal(base_schema), - stg_schema=Literal(stg_schema), - table=Literal(table), +def clparser() -> argparse.ArgumentParser: + """Command line interface for the upsert function.""" + parser = argparse.ArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, ) - if db.execute(sql).rowcount > 0: - errors.append( - "Invalid table(s) specified: {}".format( - db.dataframe(SQL("select schema_table from ups_invl_table"))[ - "schema_table" - ][0], - ), - ) + parser.add_argument( + "--version", + action="version", + version=f"%(prog)s {__version__}", + ) + parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="suppress all console output", + ) + parser.add_argument( + "-d", + "--debug", + action="store_true", + help="display debug output", + ) + parser.add_argument( + "-l", + "--log", + type=Path, + metavar="LOGFILE", + help="write log to LOGFILE", + ) + parser.add_argument( + "-e", + "--exclude", + metavar="EXCLUDE_COLUMNS", + help="comma-separated list of columns to exclude from null checks", + ) + parser.add_argument( + "-n", + "--null", + metavar="NULL_COLUMNS", + help="comma-separated list of columns to exclude from null checks", + ) + parser.add_argument( + "-c", + "--do-commit", + action="store_true", + help="commit changes to database", + ) + parser.add_argument( + "-i", + "--interactive", + action="store_true", + help="display interactive GUI of important table information", + ) + parser.add_argument( + "-m", + "--upsert-method", + metavar="UPSERT_METHOD", + default="upsert", + choices=["upsert", "update", "insert"], + help="method to use for upsert", + ) + parser.add_argument( + "host", + metavar="HOST", + help="database host", + ) + parser.add_argument( + "port", + metavar="PORT", + type=int, + help="database port", + ) + parser.add_argument( + "database", + metavar="DATABASE", + help="database name", + ) + parser.add_argument( + "user", + metavar="USER", + help="database user", + ) + parser.add_argument( + "stg_schema", + metavar="STAGING_SCHEMA", + help="staging schema name", + ) + parser.add_argument( + "base_schema", + metavar="BASE_SCHEMA", + help="base schema name", + ) + parser.add_argument( + "tables", + metavar="TABLE", + nargs="+", + help="table name(s)", + ) + return parser - error_handler(errors) +class PgUpsert: + UPSERT_METHODS = ("upsert", "update", "insert") -def validate_control(base_schema: str, stg_schema: str, control_table: str): - """Validate contents of control table against base and staging schema.""" - validate_schemas(base_schema, stg_schema) - sql = SQL( - """ - drop table if exists ups_validate_control cascade; - select cast({base_schema} as text) as base_schema, - cast({stg_schema} as text) as staging_schema, - table_name, - False as base_exists, - False as staging_exists into temporary table ups_validate_control - from {control_table}; - - update ups_validate_control as vc - set base_exists = True - from information_schema.tables as bt - where vc.base_schema = bt.table_schema - and vc.table_name = bt.table_name - and bt.table_type = cast('BASE TABLE' as text); - update ups_validate_control as vc - set staging_exists = True - from information_schema.tables as st - where vc.staging_schema = st.table_schema - and vc.table_name = st.table_name - and st.table_type = cast('BASE TABLE' as text); - drop table if exists ups_ctrl_invl_table cascade; - select string_agg( - schema_table, - '; ' - order by it.schema_table - ) as schema_table into temporary table ups_ctrl_invl_table - from ( - select base_schema || '.' || table_name as schema_table - from ups_validate_control - where not base_exists - union - select staging_schema || '.' || table_name as schema_table - from ups_validate_control - where not staging_exists - ) as it - having count(*) > 0; - """, - ).format( - base_schema=Literal(base_schema), - stg_schema=Literal(stg_schema), - control_table=Identifier(control_table), - ) - if db.execute(sql).rowcount > 0: - error_handler( - [ - "Invalid table(s) specified: {}".format( - db.dataframe("select schema_table from ups_ctrl_invl_table")[ - "schema_table" - ][0], - ), - ], + def __init__( + self, + host: str, + database: str, + user: str, + port: int = 5432, + passwd: None | str = None, + tables: list | tuple | None = (), + stg_schema: str | None = None, + base_schema: str | None = None, + do_commit: bool = False, + interactive: bool = False, + upsert_method: str = "upsert", + exclude_cols: list | tuple | None = (), + exclude_null_check_columns: list | tuple | None = (), + **kwargs, + ): + if upsert_method not in self.UPSERT_METHODS: + raise ValueError( + f"Invalid upsert method: {upsert_method}. Must be one of {self.UPSERT_METHODS}", + ) + if not base_schema: + raise ValueError("No base schema specified") + if not stg_schema: + raise ValueError("No staging schema specified") + if not tables: + raise ValueError("No tables specified") + if stg_schema == base_schema: + raise ValueError( + f"Staging and base schemas must be different. Got {stg_schema} for both.", + ) + self.db = PostgresDB( + host=host, + port=port, + database=database, + user=user, + passwd=passwd, ) + logger.debug(f"Connected to {self.db!s}") + self.tables = tables + self.stg_schema = stg_schema + self.base_schema = base_schema + self.do_commit = do_commit + self.interactive = interactive + self.upsert_method = upsert_method + self.exclude_cols = exclude_cols + self.exclude_null_check_columns = exclude_null_check_columns + self.control_table = kwargs.get("control_table", "ups_control") + self.qa_passed = False + + def __repr__(self): + return f"{self.__class__.__name__}(db={self.db!r}, tables={self.tables}, stg_schema={self.stg_schema}, base_schema={self.base_schema}, do_commit={self.do_commit}, interactive={self.interactive}, upsert_method={self.upsert_method}, exclude_cols={self.exclude_cols}, exclude_null_check_columns={self.exclude_null_check_columns})" # noqa: E501 + + def __del__(self): + self.db.close() + + def show_results(self, sql: str | Composable) -> None | str: + """Display the results of a query in a table format. If the interactive flag is set, + the results will be displayed in a Tkinter window. Otherwise, the results will be + displayed in the console using the tabulate module.""" + rows, headers, rowcount = self.db.rowdict(sql) + if rowcount == 0: + logger.info("No results found") + return None + return f"{tabulate(rows, headers='keys', tablefmt='pipe', showindex=False)}" + + def run(self) -> None: + """Run the upsert process.""" + self.validate_schemas() + for table in self.tables: + self.validate_table(table) + logger.info(f"Upserting to {self.base_schema} from {self.stg_schema}") + if self.interactive: + logger.debug("Tables selected for upsert:") + for table in self.tables: + logger.debug(f" {table}") + btn, return_value = TableUI( + "Upsert Tables", + "Tables selected for upsert", + [ + ("Continue", 0, ""), + ("Cancel", 1, ""), + ], + ["Table"], + [[table] for table in self.tables], + ).activate() + if btn != 0: + logger.info("Upsert cancelled") + return + else: + logger.info("Tables selected for upsert:") + for table in self.tables: + logger.info(f" {table}") + self.init_ups_control() + self.qa_all() + if self.qa_passed: + self.upsert_all() + self.db.close() -def staged_to_load(control_table: str, tables): - """Creates a table having the structure that is used to drive - the upsert operation on multiple staging tables. - """ - sql = SQL( + def validate_schemas(self: PgUpsert) -> None: + """Validate that the base and staging schemas exist. + + Raises: + ValueError: If either schema does not exist. """ - drop table if exists {control_table} cascade; - create temporary table {control_table} ( - table_name text not null unique, - exclude_cols text, - exclude_null_checks text, - interactive boolean not null default false, - null_errors text, - pk_errors text, - fk_errors text, - ck_errors text, - rows_updated integer, - rows_inserted integer - ); - insert into {control_table} - (table_name) - select - trim(unnest(string_to_array({tables}, ','))); - """, - ).format( - control_table=Identifier(control_table), - tables=Literal(",".join(tables)), - ) - db.execute(sql) + logger.debug(f"Validating schemas {self.base_schema} and {self.stg_schema}") + sql = SQL( + """ + select + string_agg(schemas.schema_name + || ' (' + || schema_type + || ')', '; ' order by schema_type + ) as schema_string + from + ( + select + {base_schema} as schema_name, + 'base' as schema_type + union + select + {stg_schema} as schema_name, + 'staging' as schema_type + ) as schemas + left join information_schema.schemata as iss + on schemas.schema_name=iss.schema_name + where + iss.schema_name is null + having count(*)>0; + """, + ).format( + base_schema=Literal(self.base_schema), + stg_schema=Literal(self.stg_schema), + ) + if self.db.execute(sql).rowcount > 0: + raise ValueError( + f"Invalid schema(s): {next(iter(self.db.rowdict(sql)[0]))['schema_string']}", + ) -def load_staging(base_schema: str, stg_schema: str, control_table: str): - """Performs QA checks for nulls in non-null columns, for duplicated - primary key values, and for invalid foreign keys in a set of staging - tables to be loaded into base tables. If there are failures in the - QA checks, loading is not attempted. If the loading step is carried - out, it is done within a transaction. + def validate_table(self, table: str) -> None: + """Utility script to validate one table in both base and staging schema. - The "null_errors", "pk_errors", and "fk_errors" columns of the - control table will be updated to identify any errors that occur, - so that this information is available to the caller. + Halts script processing if any either of the schemas are non-existent, + or if either of the tables are not present within those schemas pass. - The "rows_updated" and "rows_inserted" columns of the control table - will be updated with counts of the number of rows affected by the - upsert operation for each table. + Args: + table (str): The table name to validate. - When the upsert operation updates the base table, all columns of the - base table that are also in the staging table are updated. The - update operation does not test to see if column contents are different, - and so does not update only those values that are different. - """ - # Clear the columns of return values from the control table, - # in case this control table has been used previously. - db.execute( - SQL( + Raises: + ValueError: If the table does not exist in either the base or staging schema. + """ + logger.debug( + f"Validating table {table} exists in {self.base_schema} and {self.stg_schema} schemas", + ) + sql = SQL( """ - update {control_table} - set null_errors = null, - pk_errors = null, - fk_errors = null, - ck_errors = null, - rows_updated = null, - rows_inserted = null; + select string_agg( + tt.schema_name || '.' || tt.table_name || ' (' || tt.schema_type || ')', + '; ' + order by tt.schema_name, + tt.table_name + ) as schema_table + from ( + select {base_schema} as schema_name, + 'base' as schema_type, + {table} as table_name + union + select {stg_schema} as schema_name, + 'staging' as schema_type, + {table} as table_name + ) as tt + left join information_schema.tables as iss + on tt.schema_name = iss.table_schema + and tt.table_name = iss.table_name + where iss.table_name is null + having count(*) > 0; """, - ).format(control_table=Identifier(control_table)), - ) - qa_all(base_schema, stg_schema, control_table) - + ).format( + base_schema=Literal(self.base_schema), + stg_schema=Literal(self.stg_schema), + table=Literal(table), + ) + if self.db.execute(sql).rowcount > 0: + raise ValueError( + f"Invalid table(s): {next(iter(self.db.rowdict(sql)[0]))['schema_table']}", + ) -def qa_all(base_schema: str, stg_schema: str, control_table: str): - """Conducts null, primary key, foreign key, and check constraint - checks on multiple staging tables containing new or revised data - for staging tables, using the NULLQA_ONE, PKQA_ONE, FKQA_ONE, - and CKQA_ONE functions. - """ - # Create a list of the selected tables with a loop control flag. - db.execute( - SQL( + def validate_control(self: PgUpsert) -> None: + """Validate contents of control table against base and staging schema.""" + logger.debug("Validating control table") + self.validate_schemas() + sql = SQL( """ - drop table if exists ups_proctables cascade; - select tl.table_name, - tl.exclude_null_checks, - tl.interactive, - False::boolean as processed into temporary table ups_proctables - from {control_table} as tl; + drop table if exists ups_validate_control cascade; + select cast({base_schema} as text) as base_schema, + cast({stg_schema} as text) as staging_schema, + table_name, + False as base_exists, + False as staging_exists into temporary table ups_validate_control + from {control_table}; + + update ups_validate_control as vc + set base_exists = True + from information_schema.tables as bt + where vc.base_schema = bt.table_schema + and vc.table_name = bt.table_name + and bt.table_type = cast('BASE TABLE' as text); + update ups_validate_control as vc + set staging_exists = True + from information_schema.tables as st + where vc.staging_schema = st.table_schema + and vc.table_name = st.table_name + and st.table_type = cast('BASE TABLE' as text); + drop table if exists ups_ctrl_invl_table cascade; + select string_agg( + schema_table, + '; ' + order by it.schema_table + ) as schema_table into temporary table ups_ctrl_invl_table + from ( + select base_schema || '.' || table_name as schema_table + from ups_validate_control + where not base_exists + union + select staging_schema || '.' || table_name as schema_table + from ups_validate_control + where not staging_exists + ) as it + having count(*) > 0; """, - ).format(control_table=Identifier(control_table)), - ) - # Create a view returning a single unprocessed table, in order. - db.execute( - SQL( + ).format( + base_schema=Literal(self.base_schema), + stg_schema=Literal(self.stg_schema), + control_table=Identifier(self.control_table), + ) + if self.db.execute(sql).rowcount > 0: + logger.error("Invalid table(s) specified:") + rows, headers, rowcount = self.db.rowdict( + SQL("select schema_table from ups_ctrl_invl_table"), + ) + for row in rows: + logger.error(f" {row['schema_table']}") + + def init_ups_control(self: PgUpsert) -> None: + """Creates a table having the structure that is used to drive + the upsert operation on multiple staging tables. + """ + logger.debug("Initializing upsert control table") + sql = SQL( """ - drop view if exists ups_toprocess cascade; - create temporary view ups_toprocess as - select - table_name, - exclude_null_checks, - interactive - from ups_proctables - where not processed - limit 1; - """, - ), - ) - interactive = db.dataframe("select interactive from ups_toprocess;")["interactive"][ - 0 - ] - # Null checks - logger.info("") - qa_check = "Non-NULL" - logger.info(f"==={qa_check} checks===") - start_time = datetime.now() - qa_all_nullloop(base_schema, stg_schema, control_table, interactive) - logger.debug(f"{qa_check} checks completed in {ellapsed_time(start_time)}") - logger.info("") - - # Reset the loop control flag. - db.execute("update ups_proctables set processed = False;") - - qa_check = "Primary Key" - logger.info(f"==={qa_check} checks===") - start_time = datetime.now() - qa_all_pkloop(base_schema, stg_schema, control_table, interactive) - logger.debug(f"{qa_check} checks completed in {ellapsed_time(start_time)}") - logger.info("") - - # Reset the loop control flag. - db.execute("update ups_proctables set processed = False;") - - qa_check = "Foreign Key" - logger.info(f"==={qa_check} checks===") - start_time = datetime.now() - qa_all_fkloop(base_schema, stg_schema, control_table, interactive) - logger.debug(f"{qa_check} checks completed in {ellapsed_time(start_time)}") - logger.info("") - - # Reset the loop control flag. - db.execute("update ups_proctables set processed = False;") - - qa_check = "Check Constraint" - logger.info(f"==={qa_check} checks===") - start_time = datetime.now() - qa_all_ckloop(base_schema, stg_schema, control_table, interactive) - logger.debug(f"{qa_check} checks completed in {ellapsed_time(start_time)}") - logger.info("") - - -def qa_all_nullloop( - base_schema: str, - stg_schema: str, - control_table: str, - interactive: bool, -): - null_errors = [] - while True: - df = db.dataframe(SQL("select * from ups_toprocess;")) - if df.is_empty(): - break - null_qa_one( - base_schema, - stg_schema, - table=df["table_name"][0], - errors=null_errors, - exclude_null_checks=df["exclude_null_checks"][0], - interactive=interactive, + drop table if exists {control_table} cascade; + create temporary table {control_table} ( + table_name text not null unique, + exclude_cols text, + exclude_null_checks text, + interactive boolean not null default false, + null_errors text, + pk_errors text, + fk_errors text, + ck_errors text, + rows_updated integer, + rows_inserted integer + ); + insert into {control_table} + (table_name) + select + trim(unnest(string_to_array({tables}, ','))); + """, + ).format( + control_table=Identifier(self.control_table), + tables=Literal(",".join(self.tables)), ) - err_df = db.dataframe("select * from ups_null_error_list;") - if not err_df.is_empty(): - db.execute( + self.db.execute(sql) + # Update the control table with the list of columns to exclude from null checks + if self.exclude_cols and len(self.exclude_cols) > 0: + self.db.execute( SQL( """ - update {control_table} - set null_errors = {null_errors} - where table_name = {table}; + update {control_table} + set exclude_cols = {exclude_cols}; """, ).format( - control_table=Identifier(control_table), - null_errors=Literal(err_df["null_errors"][0]), - table=Literal(df["table_name"][0]), + control_table=Identifier(self.control_table), + exclude_cols=Literal(",".join(self.exclude_cols)), ), ) - - db.execute( - SQL( - """update ups_proctables set processed = True - where table_name = {table_name};""", - ).format(table_name=Literal(df["table_name"][0])), - ) - - -def null_qa_one( - base_schema: str, - stg_schema: str, - table: str, - errors: list, - exclude_null_checks: str, - interactive: bool, -): - logger.info(f"Conducting non-null QA checks on table {stg_schema}.{table}") - validate_table(base_schema, stg_schema, table) - # Create a table listing the columns of the base table that must - # be non-null and that do not have a default expression. - # Include a column for the number of rows with nulls in the staging table. - # Include a 'processed' column for loop control. - db.execute( - SQL( - """ - drop table if exists ups_nonnull_cols cascade; - select column_name, - 0::integer as null_rows, - False as processed - into temporary table ups_nonnull_cols - from information_schema.columns - where table_schema = {base_schema} - and table_name = {table} - and is_nullable = 'NO' - and column_default is null and column_name not in ({exclude_null_checks}); - """, - ).format( - base_schema=Literal(base_schema), - table=Literal(table), - exclude_null_checks=( - SQL(",").join(Literal(col) for col in exclude_null_checks.split(",")) - if exclude_null_checks - else Literal("") + if self.exclude_null_check_columns and len(self.exclude_null_check_columns) > 0: + self.db.execute( + SQL( + """ + update {control_table} + set exclude_null_checks = {exclude_null_check_columns}; + """, + ).format( + control_table=Identifier(self.control_table), + exclude_null_check_columns=Literal( + ",".join(self.exclude_null_check_columns), + ), + ), + ) + if self.interactive: + self.db.execute( + SQL( + """ + update {control_table} + set interactive = {interactive}; + """, + ).format( + control_table=Identifier(self.control_table), + interactive=Literal(self.interactive), + ), + ) + rows, headers, rowcount = self.db.rowdict( + SQL("select * from {control_table}").format( + control_table=Identifier(self.control_table), ), - ), - ) + ) + logger.debug( + f"Control table after being initialized:\n{tabulate(rows, headers='keys', tablefmt='pipe', showindex=False)}", + ) - # Process all non-nullable columns. - while True: - df = db.dataframe( - """ - select column_name - from ups_nonnull_cols - where not processed - limit 1; + def qa_all(self: PgUpsert) -> None: + """Performs QA checks for nulls in non-null columns, for duplicated + primary key values, for invalid foreign keys, and invalid check constraints + in a set of staging tables to be loaded into base tables. + If there are failures in the QA checks, loading is not attempted. + If the loading step is carried out, it is done within a transaction. + + The "null_errors", "pk_errors", "fk_errors", "ck_errors" columns of the + control table will be updated to identify any errors that occur, + so that this information is available to the caller. + + The "rows_updated" and "rows_inserted" columns of the control table + will be updated with counts of the number of rows affected by the + upsert operation for each table. + + When the upsert operation updates the base table, all columns of the + base table that are also in the staging table are updated. The + update operation does not test to see if column contents are different, + and so does not update only those values that are different. + """ + # Clear the columns of return values from the control table, + # in case this control table has been used previously. + self.db.execute( + SQL( + """ + update {control_table} + set null_errors = null, + pk_errors = null, + fk_errors = null, + ck_errors = null, + rows_updated = null, + rows_inserted = null; """, + ).format(control_table=Identifier(self.control_table)), ) - if df.is_empty(): - break - db.execute( + # Create a list of the selected tables with a loop control flag. + self.db.execute( SQL( """ - create or replace temporary view ups_qa_nonnull_col as - select nrows - from ( - select count(*) as nrows - from {stg_schema}.{table} - where {column_name} is null - ) as nullcount - where nrows > 0 - limit 1; + drop table if exists ups_proctables cascade; + select + table_name, + exclude_null_checks, + interactive, + False::boolean as processed + into temporary table ups_proctables + from {control_table}; """, - ).format( - stg_schema=Identifier(stg_schema), - table=Identifier(table), - column_name=Identifier(df["column_name"][0]), - ), + ).format(control_table=Identifier(self.control_table)), ) - null_df = db.dataframe("select * from ups_qa_nonnull_col;") - if not null_df.is_empty(): - logger.warning( - f" Column {df['column_name'][0]} has {null_df['nrows'][0]} null values", # noqa: E501 - ) - db.execute( - SQL( - """ - update ups_nonnull_cols - set null_rows = ( - select nrows - from ups_qa_nonnull_col - limit 1 - ) - where column_name = {column_name}; - """, - ).format(column_name=Literal(df["column_name"][0])), - ) - db.execute( + # Create a view returning a single unprocessed table, in order. + self.db.execute( SQL( """ - update ups_nonnull_cols - set processed = True - where column_name = {column_name}; + drop view if exists ups_toprocess cascade; + create temporary view ups_toprocess as + select + table_name, + exclude_null_checks, + interactive + from ups_proctables + where not processed + limit 1; """, - ).format(column_name=Literal(df["column_name"][0])), + ), ) - # Update the control table with the number of rows with nulls in the staging table. - db.execute( - """ - create or replace temporary view ups_null_error_list as - select string_agg(column_name || ' (' || null_rows || ')', ', ') as null_errors - from ups_nonnull_cols - where coalesce(null_rows, 0) > 0; - """, - ) - - -def qa_all_pkloop( - base_schema: str, - stg_schema: str, - control_table: str, - interactive: bool, -): - while True: - df = db.dataframe(SQL("select * from ups_toprocess;")) - if df.is_empty(): - break - pk_errors = pk_qa_one( - base_schema, - stg_schema, - table=df["table_name"][0], - interactive=interactive, - ) - if pk_errors: - db.execute( - SQL( - """ - update {control_table} - set pk_errors = {pk_errors} - where table_name = {table}; - """, - ).format( - control_table=Identifier(control_table), - pk_errors=Literal(pk_errors[0]), - table=Literal(df["table_name"][0]), - ), + qa_funcs = { + "Non-NULL": self.qa_all_null, + "Primary Key": self.qa_all_pk, + "Foreign Key": self.qa_all_fk, + "Check Constraint": self.qa_all_ck, + } + + for qa_check, qa_func in qa_funcs.items(): + logger.info(f"==={qa_check} checks===") + start_time = datetime.now() + qa_func() + logger.debug(f"{qa_check} checks completed in {ellapsed_time(start_time)}") + logger.debug(f"Control table after {qa_check} checks:") + ctrl = SQL("select * from {control_table};").format( + control_table=Identifier(self.control_table), ) + if not self.interactive: + logger.debug(f"\n{self.show_results(ctrl)}") + # Reset the loop control flag in the control table. + self.db.execute(SQL("update ups_proctables set processed = False;")) - db.execute( + # Check for errors + rows, headers, rowcount = self.db.rowdict( SQL( - """update ups_proctables set processed = True where table_name = {table_name};""", # noqa: E501 - ).format(table_name=Literal(df["table_name"][0])), + """select * from {control_table} + where coalesce(null_errors, pk_errors, fk_errors, ck_errors) is not null; + """, + ).format( + control_table=Identifier(self.control_table), + ), ) - - -def pk_qa_one(base_schema: str, stg_schema: str, table: str, interactive: bool): - pk_errors = [] - logger.info(f"Conducting primary key QA checks on table {stg_schema}.{table}") - validate_table(base_schema, stg_schema, table) - # Create a table of primary key columns on this table - db.execute( - SQL( - """ - drop table if exists ups_primary_key_columns cascade; - select k.constraint_name, k.column_name, k.ordinal_position - into temporary table ups_primary_key_columns - from information_schema.table_constraints as tc - inner join information_schema.key_column_usage as k - on tc.constraint_type = 'PRIMARY KEY' - and tc.constraint_name = k.constraint_name - and tc.constraint_catalog = k.constraint_catalog - and tc.constraint_schema = k.constraint_schema - and tc.table_schema = k.table_schema - and tc.table_name = k.table_name - and tc.constraint_name = k.constraint_name - where - k.table_name = {table} - and k.table_schema = {base_schema} - order by k.ordinal_position - ; - """, - ).format(table=Literal(table), base_schema=Literal(base_schema)), - ) - df = db.dataframe("select * from ups_primary_key_columns;") - if df.is_empty(): - return None - logger.debug(f" Checking constraint {df['constraint_name'][0]}") - # Get a comma-delimited list of primary key columns to build SQL selection - # for duplicate keys - pkcol_df = db.dataframe( - """ - select - string_agg(column_name, ', ' order by ordinal_position) as pkcollist - from ups_primary_key_columns - ; - """, - ) - pkcollist = pkcol_df["pkcollist"][0] - db.execute( - SQL( - """ - drop view if exists ups_pk_check cascade; - create temporary view ups_pk_check as - select {pkcollist}, count(*) as nrows - from {stg_schema}.{table} as s - group by {pkcollist} - having count(*) > 1; - """, - ).format( - pkcollist=SQL(pkcollist), - stg_schema=Identifier(stg_schema), - table=Identifier(table), - ), - ) - pk_check = db.dataframe("select * from ups_pk_check;") - if not pk_check.is_empty(): - logger.warning(f" Duplicate key error in columns {pkcollist}") - err_df = db.dataframe( - """ - select count(*) as errcnt, sum(nrows) as total_rows - from ups_pk_check; - """, - ) - pk_errors.append( - f"{err_df['errcnt'][0]} duplicated keys ({int(err_df['total_rows'][0])} rows) in table {stg_schema}.{table}", # noqa: E501 + if rowcount > 0: + ctrl = SQL("select * from {control_table};").format( + control_table=Identifier(self.control_table), + ) + logger.debug("QA checks failed") + logger.debug(f"\n{self.show_results(ctrl)}") + logger.debug("") + if self.interactive: + btn, return_value = TableUI( + "QA Errors", + "QA checks failed. Below is a summary of the errors:", + [ + ("Continue", 0, ""), + ("Cancel", 1, ""), + ], + headers, + list(rows), + ).activate() + else: + logger.error("===QA checks failed. Below is a summary of the errors===") + logger.error(self.show_results(ctrl)) + return + self.qa_passed = True + + def qa_all_null(self: PgUpsert) -> None: + """Performs null checks for non-null columns in selected staging tables.""" + while True: + rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_toprocess;"), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + self.qa_one_null(table=rows["table_name"]) + # Query the ups_null_error_list control table for the null errors. + err_rows, err_headers, err_rowcount = self.db.rowdict( + SQL("select * from ups_null_error_list;"), + ) + if err_rowcount > 0: + self.db.execute( + SQL( + """ + update {control_table} + set null_errors = {null_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + null_errors=Literal(next(iter(err_rows))["null_errors"]), + table_name=Literal(rows["table_name"]), + ), + ) + # Set the 'processed' column to True in the control table. + self.db.execute( + SQL( + """ + update ups_proctables + set processed = True + where table_name = {table_name}; + """, + ).format(table_name=Literal(rows["table_name"])), + ) + + def qa_one_null(self: PgUpsert, table: str) -> None: + """Performs null checks for non-null columns in a single staging table.""" + logger.info( + f"Checking for NULLs in non-NULL columns in table {self.stg_schema}.{table}", ) - logger.debug("") - logger.debug( - tabulate( - pk_check.iter_rows(), - headers=pk_check.columns, - tablefmt="pipe", - showindex=False, - colalign=["left"] * len(pk_check.columns), + self.validate_table(table) + # Create a table listing the columns of the base table that must + # be non-null and that do not have a default expression. + # Include a column for the number of rows with nulls in the staging table. + # Include a 'processed' column for loop control. + self.db.execute( + SQL( + """ + drop table if exists ups_nonnull_cols cascade; + select column_name, + 0::integer as null_rows, + False as processed + into temporary table ups_nonnull_cols + from information_schema.columns + where table_schema = {base_schema} + and table_name = {table} + and is_nullable = 'NO' + and column_default is null + and column_name not in ({exclude_null_check_columns}); + """, + ).format( + base_schema=Literal(self.base_schema), + table=Literal(table), + exclude_null_check_columns=( + SQL(",").join(Literal(col) for col in self.exclude_null_check_columns) + if self.exclude_null_check_columns + else Literal("") + ), ), ) - logger.debug("") - if interactive: - btn, return_value = TableUI( - "Duplicate key error", - f"{err_df['errcnt'][0]} duplicated keys ({int(err_df['total_rows'][0])} rows) in table {stg_schema}.{table}", # noqa: E501 - [ - ("Continue", 0, ""), - ("Cancel", 1, ""), - ], - pk_check.columns, - list(pk_check.iter_rows()), - ).activate() - if btn != 0: - error_handler(["Script canceled by user."]) - - return pk_errors - - -def qa_all_fkloop( - base_schema: str, - stg_schema: str, - control_table: str, - interactive: bool, -): - while True: - df = db.dataframe(SQL("select * from ups_toprocess;")) - if df.is_empty(): - break - fk_errors = fk_qa_one( - base_schema, - stg_schema, - table=df["table_name"][0], - interactive=interactive, - ) - if fk_errors: - db.execute( + # Process all non-nullable columns. + while True: + rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_nonnull_cols where not processed limit 1;"), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + logger.debug(f" Checking column {rows['column_name']} for nulls") + self.db.execute( SQL( """ - update {control_table} - set fk_errors = {fk_errors} - where table_name = {table}; + create or replace temporary view ups_qa_nonnull_col as + select nrows + from ( + select count(*) as nrows + from {stg_schema}.{table} + where {column_name} is null + ) as nullcount + where nrows > 0 + limit 1; """, ).format( - control_table=Identifier(control_table), - fk_errors=Literal(fk_errors), - table=Literal(df["table_name"][0]), + stg_schema=Identifier(self.stg_schema), + table=Identifier(table), + column_name=Identifier(rows["column_name"]), ), ) - - db.execute( - SQL( - """update ups_proctables set processed = True where table_name = {table_name};""", # noqa: E501 - ).format(table_name=Literal(df["table_name"][0])), + # Get the number of rows with nulls in the staging table. + null_rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_qa_nonnull_col;"), + ) + if rowcount > 0: + null_rows = next(iter(null_rows)) + logger.warning( + f" Column {rows['column_name']} has {null_rows['nrows']} null values", + ) + # Set the number of rows with nulls in the control table. + self.db.execute( + SQL( + """ + update ups_nonnull_cols + set null_rows = ( + select nrows + from ups_qa_nonnull_col + limit 1 + ) + where column_name = {column_name}; + """, + ).format(column_name=Literal(rows["column_name"])), + ) + # Set the 'processed' column to True in the control table. + self.db.execute( + SQL( + """ + update ups_nonnull_cols + set processed = True + where column_name = {column_name}; + """, + ).format(column_name=Literal(rows["column_name"])), + ) + # Update the control table with the number of rows with nulls in the staging table. + self.db.execute( + """ + create or replace temporary view ups_null_error_list as + select string_agg(column_name || ' (' || null_rows || ')', ', ') as null_errors + from ups_nonnull_cols + where coalesce(null_rows, 0) > 0; + """, ) + def qa_all_pk(self: PgUpsert) -> None: + """Performs primary key checks for duplicated primary key values in selected staging tables.""" + while True: + rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_toprocess;"), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + pk_errors = self.qa_one_pk(table=rows["table_name"]) + if pk_errors: + self.db.execute( + SQL( + """ + update {control_table} + set pk_errors = {pk_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + pk_errors=Literal(",".join(pk_errors)), + table_name=Literal(rows["table_name"]), + ), + ) + # Set the 'processed' column to True in the control table. + self.db.execute( + SQL( + """ + update ups_proctables + set processed = True + where table_name = {table_name}; + """, + ).format(table_name=Literal(rows["table_name"])), + ) -def fk_qa_one(base_schema: str, stg_schema: str, table: str, interactive: bool): - logger.info(f"Conducting foreign key QA checks on table {stg_schema}.{table}") - # Create a table of *all* foreign key dependencies in this database. - # Only create it once because it may slow the QA process down. - if ( - db.execute( - SQL( - """select * from information_schema.tables - where table_name = {ups_foreign_key_columns};""", - ).format(ups_foreign_key_columns=Literal("ups_foreign_key_columns")), - ).rowcount - == 0 - ): - db.execute( - SQL( - """ - select - fkinf.constraint_name, - fkinf.table_schema, - fkinf.table_name, - att1.attname as column_name, - fkinf.uq_schema, - cls.relname as uq_table, - att2.attname as uq_column - into - temporary table {ups_foreign_key_columns} - from - (select - ns1.nspname as table_schema, - cls.relname as table_name, - unnest(cons.conkey) as uq_table_id, - unnest(cons.confkey) as table_id, - cons.conname as constraint_name, - ns2.nspname as uq_schema, - cons.confrelid, - cons.conrelid - from - pg_constraint as cons - inner join pg_class as cls on cls.oid = cons.conrelid - inner join pg_namespace ns1 on ns1.oid = cls.relnamespace - inner join pg_namespace ns2 on ns2.oid = cons.connamespace - where - cons.contype = 'f' - ) as fkinf - inner join pg_attribute att1 on - att1.attrelid = fkinf.conrelid and att1.attnum = fkinf.uq_table_id - inner join pg_attribute att2 on - att2.attrelid = fkinf.confrelid and att2.attnum = fkinf.table_id - inner join pg_class cls on cls.oid = fkinf.confrelid; - """, - ).format(ups_foreign_key_columns=Identifier("ups_foreign_key_columns")), + def qa_one_pk(self: PgUpsert, table: str) -> list | None: + """Performs primary key checks for duplicated primary key values in a single staging table.""" + pk_errors = [] + logger.info( + f"Checking for duplicated primary key values in table {self.stg_schema}.{table}", ) - - # Create a temporary table of just the foreign key relationships for the base - # table corresponding to the staging table to check. - db.execute( - SQL( - """ - drop table if exists ups_sel_fks cascade; - select - constraint_name, table_schema, table_name, - column_name, uq_schema, uq_table, uq_column - into - temporary table ups_sel_fks - from - ups_foreign_key_columns - where - table_schema = {base_schema} - and table_name = {table}; - """, - ).format(base_schema=Literal(base_schema), table=Literal(table)), - ) - # Create a temporary table of all unique constraint names for - # this table, with an integer column to be populated with the - # number of rows failing the foreign key check, and a 'processed' - # flag to control looping. - db.execute( - SQL( - """ - drop table if exists ups_fk_constraints cascade; - select distinct - constraint_name, table_schema, table_name, - 0::integer as fkerror_values, - False as processed - into temporary table ups_fk_constraints - from ups_sel_fks; - """, - ), - ) - while True: - # Create a view to select one constraint to process. - df = db.dataframe( + self.validate_table(table) + # Create a table listing the primary key columns of the base table. + self.db.execute( SQL( """ - select constraint_name, table_schema, table_name - from ups_fk_constraints - where not processed - limit 1; + drop table if exists ups_primary_key_columns cascade; + select k.constraint_name, k.column_name, k.ordinal_position + into temporary table ups_primary_key_columns + from information_schema.table_constraints as tc + inner join information_schema.key_column_usage as k + on tc.constraint_type = 'PRIMARY KEY' + and tc.constraint_name = k.constraint_name + and tc.constraint_catalog = k.constraint_catalog + and tc.constraint_schema = k.constraint_schema + and tc.table_schema = k.table_schema + and tc.table_name = k.table_name + and tc.constraint_name = k.constraint_name + where + k.table_name = {table} + and k.table_schema = {base_schema} + order by k.ordinal_position + ; """, - ), + ).format(table=Literal(table), base_schema=Literal(self.base_schema)), ) - if df.is_empty(): - break - logger.debug( - f" Checking constraint {df['constraint_name'][0]} for table {table}", - ) - db.execute( - SQL( - """ - drop table if exists ups_one_fk cascade; - select column_name, uq_schema, uq_table, uq_column - into temporary table ups_one_fk - from ups_sel_fks - where - constraint_name = {constraint_name} - and table_schema = {table_schema} - and table_name = {table_name}; - """, - ).format( - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), - ), + rows, headers, rowcount = self.db.rowdict( + "select * from ups_primary_key_columns;", ) - const_df = db.dataframe("select * from ups_one_fk;") - # Create join expressions from staging table (s) to unique table (u) - # and to staging table equivalent to unique table (su) (though we - # don't know yet if the latter exists). Also create a 'where' - # condition to ensure that all columns being matched are non-null. - # Also create a comma-separated list of the columns being checked. - fk_df = db.dataframe( + if rowcount == 0: + logger.info("Table has no primary key") + return None + # rows = next(iter(rows)) + rows = list(rows) + logger.debug(f" Checking constraint {rows[0]['constraint_name']}") + # Get a comma-delimited list of primary key columns to build SQL selection + # for duplicate keys, ordered by ordinal position. + pk_cols = SQL(",").join(Identifier(row["column_name"]) for row in rows) + self.db.execute( SQL( """ - select - string_agg('s.' || column_name || ' = u.' || uq_column, ' and ') as u_join, - string_agg('s.' || column_name || ' = su.' || uq_column, ' and ') as su_join, - string_agg('s.' || column_name || ' is not null', ' and ') as s_not_null, - string_agg('s.' || column_name, ', ') as s_checked - from - (select * from ups_one_fk) as fkcols; - """, - ), - ) - # Determine whether a staging-table equivalent of the unique table exists. - su_exists = False - if ( - db.execute( - SQL( - """select * from information_schema.tables - where table_name = {table} and table_schema = {stg_schema};""", - ).format( - table=Literal(const_df["uq_table"][0]), - stg_schema=Literal(stg_schema), - ), - ).rowcount - > 0 - ): - su_exists = True - - # Construct a query to test for missing unique values for fk columns. - query = SQL( - """ - drop view if exists ups_fk_check cascade; - create or replace temporary view ups_fk_check as - select {s_checked}, count(*) as nrows + drop view if exists ups_pk_check cascade; + create temporary view ups_pk_check as + select {pkcollist}, count(*) as nrows from {stg_schema}.{table} as s - left join {uq_schema}.{uq_table} as u on {u_join} + group by {pkcollist} + having count(*) > 1; """, - ).format( - s_checked=SQL(fk_df["s_checked"][0]), - stg_schema=Identifier(stg_schema), - table=Identifier(table), - uq_schema=Identifier(const_df["uq_schema"][0]), - uq_table=Identifier(const_df["uq_table"][0]), - u_join=SQL(fk_df["u_join"][0]), - ) - if su_exists: - query += SQL( - """ left join {stg_schema}.{uq_table} as su on {su_join}""", ).format( - stg_schema=Identifier(stg_schema), - uq_table=Identifier(const_df["uq_table"][0]), - su_join=SQL(fk_df["su_join"][0]), - ) - query += SQL(" where u.{uq_column} is null").format( - uq_column=Identifier(const_df["uq_column"][0]), + pkcollist=pk_cols, + stg_schema=Identifier(self.stg_schema), + table=Identifier(table), + ), ) - if su_exists: - query += SQL(" and su.{uq_column} is null").format( - uq_column=Identifier(const_df["uq_column"][0]), - ) - query += SQL( - """ and {s_not_null} - group by {s_checked};""", - ).format( - s_not_null=SQL(fk_df["s_not_null"][0]), - s_checked=SQL(fk_df["s_checked"][0]), + pk_errs, pk_headers, pk_rowcount = self.db.rowdict( + "select * from ups_pk_check;", ) - - db.execute(query) - fk_check_df = db.dataframe("select * from ups_fk_check;") - - if not fk_check_df.is_empty(): + if pk_rowcount > 0: logger.warning( - f" Foreign key error referencing {const_df['uq_table'][0]}", + f" Duplicate key error in columns {pk_cols.as_string(self.db.cursor())}", ) - logger.debug("") - logger.debug( - tabulate( - fk_check_df.iter_rows(), - headers=fk_check_df.columns, - tablefmt="pipe", - showindex=False, - colalign=["left"] * len(fk_check_df.columns), + pk_errs = list(pk_errs) + tot_errs, tot_headers, tot_rowcount = self.db.rowdict( + SQL( + "select count(*) as errcount, sum(nrows) as total_rows from ups_pk_check;", ), ) + tot_errs = next(iter(tot_errs)) + err_msg = f"{tot_errs['errcount']} duplicate keys ({tot_errs['total_rows']} rows) in table {self.stg_schema}.{table}" + pk_errors.append(err_msg) logger.debug("") - if interactive: + err_sql = SQL("select * from ups_pk_check;") + logger.debug(f"\n{self.show_results(err_sql)}") + logger.debug("") + if self.interactive: btn, return_value = TableUI( - "Foreign Key Error", - f"Foreign key error referencing {const_df['uq_table'][0]}", + "Duplicate key error", + err_msg, [ ("Continue", 0, ""), ("Cancel", 1, ""), ], - fk_check_df.columns, - list(fk_check_df.iter_rows()), + pk_headers, + list(pk_errs), ).activate() if btn != 0: - error_handler(["Script canceled by user."]) - - db.execute( + logger.warning("Script cancelled by user") + sys.exit(0) + return pk_errors + + def qa_all_fk(self: PgUpsert) -> None: + """Performs foreign key checks for invalid foreign key values in selected staging tables.""" + while True: + rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_toprocess;"), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + fk_errors = self.qa_one_fk(table=rows["table_name"]) + if fk_errors: + self.db.execute( + SQL( + """ + update {control_table} + set fk_errors = {fk_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + fk_errors=Literal(",".join(fk_errors)), + table_name=Literal(rows["table_name"]), + ), + ) + # Set the 'processed' column to True in the control table. + self.db.execute( SQL( """ - update ups_fk_constraints - set fkerror_values = {fkerror_count} - where constraint_name = {constraint_name} - and table_schema = {table_schema} - and table_name = {table_name}; + update ups_proctables + set processed = True + where table_name = {table_name}; """, - ).format( - fkerror_count=Literal(fk_check_df["nrows"][0]), - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), - ), + ).format(table_name=Literal(rows["table_name"])), ) - db.execute( - SQL( - """ - update ups_fk_constraints - set processed = True - where - constraint_name = {constraint_name} - and table_schema = {table_schema} - and table_name = {table_name}; - """, - ).format( - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), - ), - ) - - err_df = db.dataframe( - SQL( - """ - select string_agg( - constraint_name || ' (' || fkerror_values || ')', ', ' - ) as fk_errors - from ups_fk_constraints - where coalesce(fkerror_values, 0) > 0; - """, - ), - ) - return err_df["fk_errors"][0] - -def qa_all_ckloop( - base_schema: str, - stg_schema: str, - control_table: str, - interactive: bool, -): - ck_errors = [] - while True: - df = db.dataframe(SQL("select * from ups_toprocess;")) - if df.is_empty(): - break - ck_qa_one( - base_schema, - stg_schema, - table=df["table_name"][0], - errors=ck_errors, - interactive=interactive, + def qa_one_fk(self: PgUpsert, table: str) -> list | None: + logger.info( + f"Conducting foreign key QA checks on table {self.stg_schema}.{table}", ) - err_df = db.dataframe( - "select * from ups_ck_error_list where ck_errors is not null", - ) - if not err_df.is_empty(): - db.execute( + self.validate_table(table) + # Create a table of *all* foreign key dependencies in this database. + # Only create it once because it may slow the QA process down. + if ( + self.db.execute( + SQL( + """select * from information_schema.tables + where table_name = {ups_foreign_key_columns};""", + ).format(ups_foreign_key_columns=Literal("ups_foreign_key_columns")), + ).rowcount + == 0 + ): + self.db.execute( SQL( """ - update {control_table} - set ck_errors = {ck_errors} - where table_name = {table}; - """, - ).format( - control_table=Identifier(control_table), - ck_errors=Literal(err_df["ck_errors"][0]), - table=Literal(df["table_name"][0]), - ), - ) - - db.execute( - SQL( - """update ups_proctables set processed = True - where table_name = {table_name};""", - ).format(table_name=Literal(df["table_name"][0])), - ) - - -def ck_qa_one( - base_schema: str, - stg_schema: str, - table: str, - errors: list, - interactive: bool, -): - logger.info(f"Conducting check constraint QA checks on table {stg_schema}.{table}") - validate_table(base_schema, stg_schema, table) - # Create a table of *all* check constraints in this database. - # Because this may be an expensive operation (in terms of time), the - # table is not re-created if it already exists. "Already exists" - # means that a table with the expected name exists. No check is - # done to ensure that this table has the correct structure. The - # goal is to create the table of all check constraints only once to - # minimize the time required if QA checks are to be run on multiple - # staging tables. - if ( - db.execute( - SQL( - """select * from information_schema.tables - where table_name = {ups_check_constraints};""", - ).format(ups_check_constraints=Literal("ups_check_constraints")), - ).rowcount - == 0 - ): - db.execute( - SQL( - """ - drop table if exists ups_check_constraints cascade; - select - nspname as table_schema, - cast(conrelid::regclass as text) as table_name, - conname as constraint_name, - pg_get_constraintdef(pg_constraint.oid) AS consrc - into temporary table ups_check_constraints - from pg_constraint - inner join pg_class on pg_constraint.conrelid = pg_class.oid - inner join pg_namespace on pg_class.relnamespace=pg_namespace.oid - where contype = 'c' and nspname = {base_schema}; - """, - ).format(base_schema=Literal(base_schema)), - ) - - # Create a temporary table of just the check constraints for the base - # table corresponding to the staging table to check. Include a - # column for the number of rows failing the check constraint, and a - # 'processed' flag to control looping. - db.execute( - SQL( - """ - drop table if exists ups_sel_cks cascade; - select - constraint_name, table_schema, table_name, consrc, - 0::integer as ckerror_values, - False as processed - into temporary table ups_sel_cks - from ups_check_constraints - where - table_schema = {base_schema} - and table_name = {table}; - """, - ).format(base_schema=Literal(base_schema), table=Literal(table)), - ) - - # Process all check constraints. - while True: - df = db.dataframe( - """ - select constraint_name, table_schema, table_name, consrc - from ups_sel_cks - where not processed - limit 1; + select + fkinf.constraint_name, + fkinf.table_schema, + fkinf.table_name, + att1.attname as column_name, + fkinf.uq_schema, + cls.relname as uq_table, + att2.attname as uq_column + into + temporary table {ups_foreign_key_columns} + from + (select + ns1.nspname as table_schema, + cls.relname as table_name, + unnest(cons.conkey) as uq_table_id, + unnest(cons.confkey) as table_id, + cons.conname as constraint_name, + ns2.nspname as uq_schema, + cons.confrelid, + cons.conrelid + from + pg_constraint as cons + inner join pg_class as cls on cls.oid = cons.conrelid + inner join pg_namespace ns1 on ns1.oid = cls.relnamespace + inner join pg_namespace ns2 on ns2.oid = cons.connamespace + where + cons.contype = 'f' + ) as fkinf + inner join pg_attribute att1 on + att1.attrelid = fkinf.conrelid and att1.attnum = fkinf.uq_table_id + inner join pg_attribute att2 on + att2.attrelid = fkinf.confrelid and att2.attnum = fkinf.table_id + inner join pg_class cls on cls.oid = fkinf.confrelid; """, - ) - if df.is_empty(): - break - logger.debug(f" Checking constraint {df['constraint_name'][0]}") - # Create a df with the check constraint sql and remove the 'CHECK' keyword - check_df = db.dataframe( + ).format(ups_foreign_key_columns=Identifier("ups_foreign_key_columns")), + ) + # Create a temporary table of just the foreign key relationships for the base + # table corresponding to the staging table to check. + self.db.execute( SQL( """ + drop table if exists ups_sel_fks cascade; select - regexp_replace(consrc, '^CHECK\\s*\\((.*)\\)$', '\\1') as check_sql - from ups_sel_cks + constraint_name, table_schema, table_name, + column_name, uq_schema, uq_table, uq_column + into + temporary table ups_sel_fks + from + ups_foreign_key_columns where - constraint_name = {constraint_name} - and table_schema = {table_schema} - and table_name = {table_name}; + table_schema = {base_schema} + and table_name = {table}; """, - ).format( - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), - ), + ).format(base_schema=Literal(self.base_schema), table=Literal(table)), ) - # Run the check_sql - db.execute( + # Create a temporary table of all unique constraint names for + # this table, with an integer column to be populated with the + # number of rows failing the foreign key check, and a 'processed' + # flag to control looping. + self.db.execute( SQL( """ - create or replace temporary view ups_ck_check_check as - select count(*) from {stg_schema}.{table} - where not ({check_sql}) - """, - ).format( - stg_schema=Identifier(stg_schema), - table=Identifier(table), - check_sql=SQL(check_df["check_sql"][0]), + drop table if exists ups_fk_constraints cascade; + select distinct + constraint_name, table_schema, table_name, + 0::integer as fkerror_values, + False as processed + into temporary table ups_fk_constraints + from ups_sel_fks; + """, ), ) - - ck_check = db.dataframe("select * from ups_ck_check_check;") - if ck_check["count"][0] > 0: - logger.warning( - f" Check constraint {df['constraint_name'][0]} has {ck_check['count'][0]} failing rows", # noqa: E501 + while True: + # Create a view to select one constraint to process. + rows, headers, rowcount = self.db.rowdict( + SQL( + """select constraint_name, table_schema, table_name + from ups_fk_constraints where not processed limit 1;""", + ), ) - - db.execute( + if rowcount == 0: + break + rows = next(iter(rows)) + logger.debug(f" Checking constraint {rows['constraint_name']}") + self.db.execute( SQL( """ - update ups_sel_cks - set ckerror_values = {ckerror_count} + drop table if exists ups_one_fk cascade; + select column_name, uq_schema, uq_table, uq_column + into temporary table ups_one_fk + from ups_sel_fks where constraint_name = {constraint_name} and table_schema = {table_schema} and table_name = {table_name}; - """, + """, ).format( - ckerror_count=Literal(ck_check["count"][0]), - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), ), ) - - db.execute( - SQL( + const_rows, const_headers, const_rowcount = self.db.rowdict( + "select * from ups_one_fk;", + ) + if const_rowcount == 0: + logger.debug(" No foreign key columns found") + break + const_rows = next(iter(const_rows)) + # Create join expressions from staging table (s) to unique table (u) + # and to staging table equivalent to unique table (su) (though we + # don't know yet if the latter exists). Also create a 'where' + # condition to ensure that all columns being matched are non-null. + # Also create a comma-separated list of the columns being checked. + fk_rows, fk_headers, fk_rowcount = self.db.rowdict( + SQL( + """ + select + string_agg('s.' || column_name || ' = u.' || uq_column, ' and ') as u_join, + string_agg('s.' || column_name || ' = su.' || uq_column, ' and ') as su_join, + string_agg('s.' || column_name || ' is not null', ' and ') as s_not_null, + string_agg('s.' || column_name, ', ') as s_checked + from + (select * from ups_one_fk) as fkcols; + """, + ), + ) + fk_rows = next(iter(fk_rows)) + # Determine whether a staging-table equivalent of the unique table exists. + su_exists = False + if ( + self.db.execute( + SQL( + """select * from information_schema.tables + where table_name = {table} and table_schema = {stg_schema};""", + ).format( + table=Literal(const_rows["uq_table"]), + stg_schema=Literal(self.stg_schema), + ), + ).rowcount + > 0 + ): + su_exists = True + # Construct a query to test for missing unique values for fk columns. + query = SQL( """ - update ups_sel_cks - set processed = True - where - constraint_name = {constraint_name} - and table_schema = {table_schema} - and table_name = {table_name}; - """, + drop view if exists ups_fk_check cascade; + create or replace temporary view ups_fk_check as + select {s_checked}, count(*) as nrows + from {stg_schema}.{table} as s + left join {uq_schema}.{uq_table} as u on {u_join} + """, ).format( - constraint_name=Literal(df["constraint_name"][0]), - table_schema=Literal(df["table_schema"][0]), - table_name=Literal(df["table_name"][0]), - ), - ) - - # Update the control table with the number of rows failing the check constraint. - db.execute( - SQL( - """ - create or replace temporary view ups_ck_error_list as - select string_agg( - constraint_name || ' (' || ckerror_values || ')', ', ' - ) as ck_errors - from ups_sel_cks - where coalesce(ckerror_values, 0) > 0; - """, - ), - ) - - -def upsert_all( - base_schema: str, - stg_schema: str, - control_table: str, - upsert_method: str, -): - validate_control(base_schema, stg_schema, control_table) - - # Get a table of all dependencies for the base schema. - db.execute( - SQL( - """ - drop table if exists ups_dependencies cascade; - create temporary table ups_dependencies as - select - tc.table_name as child, - tu.table_name as parent - from - information_schema.table_constraints as tc - inner join information_schema.constraint_table_usage as tu - on tu.constraint_name = tc.constraint_name - where - tc.constraint_type = 'FOREIGN KEY' - and tc.table_name <> tu.table_name - and tc.table_schema = {base_schema}; - """, - ).format(base_schema=Literal(base_schema)), - ) - - # Create a list of tables in the base schema ordered by dependency. - db.execute( - SQL( - """ - drop table if exists ups_ordered_tables cascade; - with recursive dep_depth as ( - select - dep.child as first_child, - dep.child, - dep.parent, - 1 as lvl - from - ups_dependencies as dep - union all - select - dd.first_child, - dep.child, - dep.parent, - dd.lvl + 1 as lvl - from - dep_depth as dd - inner join ups_dependencies as dep on dep.parent = dd.child - and dep.child <> dd.parent - and not (dep.parent = dd.first_child and dd.lvl > 2) - ) - select - table_name, - table_order - into - temporary table ups_ordered_tables - from ( - select - dd.parent as table_name, - max(lvl) as table_order - from - dep_depth as dd - group by - table_name - union - select - dd.child as table_name, - max(lvl) + 1 as level - from - dep_depth as dd - left join ups_dependencies as dp on dp.parent = dd.child - where - dp.parent is null - group by - dd.child - union - select distinct - t.table_name, - 0 as level - from - information_schema.tables as t - left join ups_dependencies as p on t.table_name=p.parent - left join ups_dependencies as c on t.table_name=c.child - where - t.table_schema = {base_schema} - and t.table_type = 'BASE TABLE' - and p.parent is null - and c.child is null - ) as all_levels; - """, - ).format(base_schema=Literal(base_schema)), - ) - - # Create a list of the selected tables with ordering information. - db.execute( - SQL( - """ - drop table if exists ups_proctables cascade; - select - ot.table_order, - tl.table_name, - tl.exclude_cols, - tl.interactive, - tl.rows_updated, - tl.rows_inserted, - False::boolean as processed - into - temporary table ups_proctables - from - {control_table} as tl - inner join ups_ordered_tables as ot on ot.table_name = tl.table_name - ; - """, - ).format(control_table=Identifier(control_table)), - ) - - while True: - # Create a view returning a single unprocessed table, in order. - proc_df = db.dataframe( - SQL( - """ - select - table_name, exclude_cols, interactive, - rows_updated, rows_inserted - from ups_proctables - where not processed - order by table_order - limit 1; - """, - ), - ) - if proc_df.is_empty(): - break - - rows_updated, rows_inserted = upsert_one( - base_schema, - stg_schema, - upsert_method, - proc_df["table_name"][0], - proc_df["exclude_cols"][0].split(",") if proc_df["exclude_cols"][0] else [], - proc_df["interactive"][0], - ) - - db.execute( + s_checked=SQL(fk_rows["s_checked"]), + stg_schema=Identifier(self.stg_schema), + table=Identifier(table), + uq_schema=Identifier(const_rows["uq_schema"]), + uq_table=Identifier(const_rows["uq_table"]), + u_join=SQL(fk_rows["u_join"]), + ) + if su_exists: + query += SQL( + """ left join {stg_schema}.{uq_table} as su on {su_join}""", + ).format( + stg_schema=Identifier(self.stg_schema), + uq_table=Identifier(const_rows["uq_table"]), + su_join=SQL(fk_rows["su_join"]), + ) + query += SQL(" where u.{uq_column} is null").format( + uq_column=Identifier(const_rows["uq_column"]), + ) + if su_exists: + query += SQL(" and su.{uq_column} is null").format( + uq_column=Identifier(const_rows["uq_column"]), + ) + query += SQL( + """ and {s_not_null} + group by {s_checked};""", + ).format( + s_not_null=SQL(fk_rows["s_not_null"]), + s_checked=SQL(fk_rows["s_checked"]), + ) + self.db.execute(query) + check_sql = SQL("select * from ups_fk_check;") + fk_check_rows, fk_check_headers, fk_check_rowcount = self.db.rowdict( + check_sql, + ) + if fk_check_rowcount > 0: + fk_check_rows = next(iter(fk_check_rows)) + logger.warning( + f" Foreign key error referencing {const_rows['uq_schema']}.{const_rows['uq_table']}", + ) + logger.debug("") + logger.debug(f"\n{self.show_results(check_sql)}") + logger.debug("") + if self.interactive: + btn, return_value = TableUI( + "Foreign key error", + f"Foreign key error referencing {const_rows['uq_schema']}.{const_rows['uq_table']}", + [ + ("Continue", 0, ""), + ("Cancel", 1, ""), + ], + fk_check_headers, + fk_check_rows, + ).activate() + if btn != 0: + logger.warning("Script cancelled by user") + sys.exit(0) + + self.db.execute( + SQL( + """ + update ups_fk_constraints + set fkerror_values = {fkerror_count} + where constraint_name = {constraint_name} + and table_schema = {table_schema} + and table_name = {table_name}; + """, + ).format( + fkerror_count=Literal(fk_check_rows["nrows"]), + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), + ), + ) + self.db.execute( + SQL( + """ + update ups_fk_constraints + set processed = True + where + constraint_name = {constraint_name} + and table_schema = {table_schema} + and table_name = {table_name}; + """, + ).format( + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), + ), + ) + err_rows, err_headers, err_rowcount = self.db.rowdict( SQL( """ - update ups_proctables - set rows_updated = {rows_updated}, - rows_inserted = {rows_inserted} - where table_name = {table_name}; + select string_agg( + constraint_name || ' (' || fkerror_values || ')', ', ' + ) as fk_errors + from ups_fk_constraints + where coalesce(fkerror_values, 0) > 0; """, - ).format( - rows_updated=Literal(rows_updated), - rows_inserted=Literal(rows_inserted), - table_name=Literal(proc_df["table_name"][0]), ), ) + if err_rowcount > 0: + return [err["fk_errors"] for err in list(err_rows) if err["fk_errors"]] + return None - db.execute( - SQL( - """ + def qa_all_ck(self: PgUpsert) -> None: + while True: + rows, headers, rowcount = self.db.rowdict( + SQL("select * from ups_toprocess;"), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + self.qa_one_ck(table=rows["table_name"]) + err_rows, err_headers, err_rowcount = self.db.rowdict( + "select * from ups_ck_error_list;", + ) + if err_rowcount > 0: + self.db.execute( + SQL( + """ + update {control_table} + set ck_errors = {ck_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + ck_errors=Literal(next(iter(err_rows))["ck_errors"]), + table_name=Literal(rows["table_name"]), + ), + ) + # Set the 'processed' column to True in the control table. + self.db.execute( + SQL( + """ update ups_proctables set processed = True where table_name = {table_name}; """, - ).format(table_name=Literal(proc_df["table_name"][0])), - ) - - # Move the update/insert counts back into the control table. - db.execute( - SQL( - """ - update {control_table} as ct - set - rows_updated = pt.rows_updated, - rows_inserted = pt.rows_inserted - from - ups_proctables as pt - where - pt.table_name = ct.table_name; - """, - ).format(control_table=Identifier(control_table)), - ) - - -def upsert_one( - base_schema: str, - stg_schema: str, - upsert_method: str, - table: str, - exclude_cols: list[str], - interactive: bool = False, -): - rows_updated = 0 - rows_inserted = 0 + ).format(table_name=Literal(rows["table_name"])), + ) - logger.info(f"Performing upsert on table {base_schema}.{table}") - validate_table(base_schema, stg_schema, table) + def qa_one_ck(self: PgUpsert, table: str) -> list | None: + logger.info( + f"Conducting check constraint QA checks on table {self.stg_schema}.{table}", + ) + # Create a table of *all* check constraints in this database. + # Because this may be an expensive operation (in terms of time), the + # table is not re-created if it already exists. "Already exists" + # means that a table with the expected name exists. No check is + # done to ensure that this table has the correct structure. The + # goal is to create the table of all check constraints only once to + # minimize the time required if QA checks are to be run on multiple + # staging tables. + if ( + self.db.execute( + SQL( + """select * from information_schema.tables + where table_name = {ups_check_constraints};""", + ).format(ups_check_constraints=Literal("ups_check_constraints")), + ).rowcount + == 0 + ): + self.db.execute( + SQL( + """ + drop table if exists ups_check_constraints cascade; + select + nspname as table_schema, + cast(conrelid::regclass as text) as table_name, + conname as constraint_name, + pg_get_constraintdef(pg_constraint.oid) AS consrc + into temporary table ups_check_constraints + from pg_constraint + inner join pg_class on pg_constraint.conrelid = pg_class.oid + inner join pg_namespace on pg_class.relnamespace=pg_namespace.oid + where contype = 'c' and nspname = {base_schema}; + """, + ).format(base_schema=Literal(self.base_schema)), + ) - # Populate a (temporary) table with the names of the columns - # in the base table that are to be updated from the staging table. - # Include only those columns from staging table that are also in base table. - # db.execute( - query = SQL( - """ - drop table if exists ups_cols cascade; - select s.column_name - into temporary table ups_cols - from information_schema.columns as s - inner join information_schema.columns as b on s.column_name=b.column_name - where - s.table_schema = {stg_schema} - and s.table_name = {table} - and b.table_schema = {base_schema} - and b.table_name = {table} - """, - ).format( - stg_schema=Literal(stg_schema), - table=Literal(table), - base_schema=Literal(base_schema), - ) - if exclude_cols: - query += SQL( - """ - and s.column_name not in ({exclude_cols}) + # Create a temporary table of just the check constraints for the base + # table corresponding to the staging table to check. Include a + # column for the number of rows failing the check constraint, and a + # 'processed' flag to control looping. + self.db.execute( + SQL( + """ + drop table if exists ups_sel_cks cascade; + select + constraint_name, table_schema, table_name, consrc, + 0::integer as ckerror_values, + False as processed + into temporary table ups_sel_cks + from ups_check_constraints + where + table_schema = {base_schema} + and table_name = {table}; """, - ).format( - exclude_cols=SQL(",").join(Literal(col) for col in exclude_cols), + ).format(base_schema=Literal(self.base_schema), table=Literal(table)), ) - query += SQL(" order by s.ordinal_position;") - db.execute(query) - - # Populate a (temporary) table with the names of the primary key - # columns of the base table. - db.execute( - SQL( - """ - drop table if exists ups_pks cascade; - select k.column_name - into temporary table ups_pks - from information_schema.table_constraints as tc - inner join information_schema.key_column_usage as k - on tc.constraint_type = 'PRIMARY KEY' - and tc.constraint_name = k.constraint_name - and tc.constraint_catalog = k.constraint_catalog - and tc.constraint_schema = k.constraint_schema - and tc.table_schema = k.table_schema - and tc.table_name = k.table_name - and tc.constraint_name = k.constraint_name - where - k.table_name = {table} - and k.table_schema = {base_schema} - order by k.ordinal_position; - """, - ).format(table=Literal(table), base_schema=Literal(base_schema)), - ) - # Get all base table columns that are to be updated into a comma-delimited list. - all_col_list = db.dataframe( - SQL( - """ - select string_agg(column_name, ', ') as cols from ups_cols;""", - ), - )["cols"][0] - - # Get all base table columns that are to be updated into a - # comma-delimited list with a "b." prefix. - base_col_list = db.dataframe( - SQL( - """ - select string_agg('b.' || column_name, ', ') as cols - from ups_cols;""", - ), - )["cols"][0] - - # Get all staging table column names for columns that are to be updated - # into a comma-delimited list with an "s." prefix. - stg_col_list = db.dataframe( - SQL( - """ - select string_agg('s.' || column_name, ', ') as cols - from ups_cols;""", - ), - )["cols"][0] - - # Get the primary key columns in a comma-delimited list. - pk_col_list = db.dataframe( - SQL( - """ - select string_agg(column_name, ', ') as cols - from ups_pks;""", - ), - )["cols"][0] - - # Create a join expression for key columns of the base (b) and - # staging (s) tables. - join_expr = db.dataframe( - SQL( - """ - select - string_agg('b.' || column_name || ' = s.' || column_name, ' and ') as expr - from - ups_pks; - """, - ), - )["expr"][0] - - # Create a FROM clause for an inner join between base and staging - # tables on the primary key column(s). - from_clause = SQL( - """FROM {base_schema}.{table} as b - INNER JOIN {stg_schema}.{table} as s ON {join_expr}""", - ).format( - base_schema=Identifier(base_schema), - table=Identifier(table), - stg_schema=Identifier(stg_schema), - join_expr=SQL(join_expr), - ) - - # Create SELECT queries to pull all columns with matching keys from both - # base and staging tables. - db.execute( - SQL( - """ - drop view if exists ups_basematches cascade; - create temporary view ups_basematches as select {base_col_list} {from_clause}; + # Process all check constraints. + while True: + rows, headers, rowcount = self.db.rowdict( + SQL( + """select constraint_name, table_schema, table_name, consrc + from ups_sel_cks where not processed limit 1;""", + ), + ) + if rowcount == 0: + break + rows = next(iter(rows)) + logger.debug(f" Checking constraint {rows['constraint_name']}") + # Remove the 'CHECK' keyword from the constraint definition. + const_rows, const_headers, const_rowcount = self.db.rowdict( + SQL( + """ + select + regexp_replace(consrc, '^CHECK\\s*\\((.*)\\)$', '\\1') as check_sql + from ups_sel_cks + where + constraint_name = {constraint_name} + and table_schema = {table_schema} + and table_name = {table_name}; + """, + ).format( + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), + ), + ) + const_rows = next(iter(const_rows)) + # Run the check_sql + self.db.execute( + SQL( + """ + create or replace temporary view ups_ck_check_check as + select count(*) from {stg_schema}.{table} + where not ({check_sql}) + """, + ).format( + stg_schema=Identifier(self.stg_schema), + table=Identifier(table), + check_sql=SQL(const_rows["check_sql"]), + ), + ) - drop view if exists ups_stgmatches cascade; - create temporary view ups_stgmatches as select {stg_col_list} {from_clause}; - """, - ).format( - base_col_list=SQL(base_col_list), - stg_col_list=SQL(stg_col_list), - from_clause=from_clause, - ), - ) - # Get non-key columns to be updated - db.execute( - SQL( - """ - drop view if exists ups_nk cascade; - create temporary view ups_nk as - select column_name from ups_cols - except - select column_name from ups_pks; - """, - ), - ) - # Prompt user to examine matching data and commit, don't commit, or quit. - - do_updates = False - update_stmt = None - # if not stg_df.is_empty() and not nk_df.is_empty(): - if upsert_method in ("upsert", "update"): - stg_curs = db.execute("select * from ups_stgmatches;") - stg_cols = [col.name for col in stg_curs.description] - stg_rowcount = stg_curs.rowcount - stg_data = stg_curs.fetchall() - nk_curs = db.execute("select * from ups_nk;") - # nk_cols = [col.name for col in nk_curs.description] - nk_rowcount = nk_curs.rowcount - # nk_data = nk_curs.fetchall() - if stg_rowcount > 0 and nk_rowcount > 0: - base_curs = db.execute("select * from ups_basematches;") - base_cols = [col.name for col in base_curs.description] - # base_rowcount = base_curs.rowcount - base_data = base_curs.fetchall() - - if interactive: - btn, return_value = CompareUI( - "Compare Tables", - f"Do you want to make these changes? For table {table}, new data are shown in the top table; existing data are shown in the bottom table.", # noqa: E501 - [ - ("Continue", 0, ""), - ("Skip", 1, ""), - ("Cancel", 2, ""), - ], - stg_cols, - stg_data, - base_cols, - base_data, - pk_col_list.split(", "), - sidebyside=False, - ).activate() - else: - btn = 0 - if btn == 2: - error_handler(["Upsert cancelled"]) - if btn == 0: - do_updates = True - # Create an assignment expression to update non-key columns of the - # base table (un-aliased) from columns of the staging table (as s). - ups_expr = db.dataframe( + ck_check_rows, ck_check_headers, ck_check_rowcount = self.db.rowdict( + "select * from ups_ck_check_check where count > 0;", + ) + if ck_check_rowcount > 0: + ck_check_rows = next(iter(ck_check_rows)) + logger.warning( + f" Check constraint {rows['constraint_name']} has {ck_check_rowcount} failing rows", + ) + self.db.execute( SQL( """ - select string_agg( - column_name || ' = s.' || column_name, ', ' - ) as col - from ups_nk; + update ups_sel_cks + set ckerror_values = {ckerror_count} + where + constraint_name = {constraint_name} + and table_schema = {table_schema} + and table_name = {table_name}; """, + ).format( + ckerror_count=Literal(ck_check_rows["count"]), + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), ), - )["col"][0] - # Create an UPDATE statement to update the base table with - # non-key columns from the staging table. - # No semicolon terminating generated SQL. - update_stmt = SQL( - """ - UPDATE {base_schema}.{table} as b - SET {ups_expr} - FROM {stg_schema}.{table} as s WHERE {join_expr} - """, - ).format( - base_schema=Identifier(base_schema), - table=Identifier(table), - stg_schema=Identifier(stg_schema), - ups_expr=SQL(ups_expr), - join_expr=SQL(join_expr), ) - else: - logger.debug(" No data to update") - - do_inserts = False - insert_stmt = None - if upsert_method in ("upsert", "insert"): - # Create a select statement to find all rows of the staging table - # that are not in the base table. - db.execute( - SQL( - """ - drop view if exists ups_newrows cascade; - create temporary view ups_newrows as with newpks as ( - select {pk_col_list} - from {stg_schema}.{table} - except - select {pk_col_list} - from {base_schema}.{table} - ) - select s.* - from {stg_schema}.{table} as s - inner join newpks using ({pk_col_list}); - """, - ).format( - stg_schema=Identifier(stg_schema), - table=Identifier(table), - pk_col_list=SQL(pk_col_list), - base_schema=Identifier(base_schema), - ), - ) - # Prompt user to examine new data and continue or quit. - new_curs = db.execute("select * from ups_newrows;") - new_cols = [col.name for col in new_curs.description] - new_rowcount = new_curs.rowcount - new_data = new_curs.fetchall() - - # if not new_df.is_empty(): - if new_rowcount > 0: - if interactive: - btn, return_value = TableUI( - "New Data", - f"Do you want to add these new data to the {base_schema}.{table} table?", # noqa: E501 - [ - ("Continue", 0, ""), - ("Skip", 1, ""), - ("Cancel", 2, ""), - ], - # new_df.columns, - new_cols, - # list(new_df.iter_rows()), - new_data, - ).activate() - else: - btn = 0 - if btn == 2: - error_handler(["Upsert cancelled"]) - if btn == 0: - do_inserts = True - # Create an insert statement. No semicolon terminating generated SQL. - insert_stmt = SQL( + self.db.execute( + SQL( """ - INSERT INTO {base_schema}.{table} ({all_col_list}) - SELECT {all_col_list} FROM ups_newrows + update ups_sel_cks + set processed = True + where + constraint_name = {constraint_name} + and table_schema = {table_schema} + and table_name = {table_name}; """, ).format( - base_schema=Identifier(base_schema), - table=Identifier(table), - all_col_list=SQL(all_col_list), - ) - else: - logger.debug(" No new data to insert") - - # Run the update and insert statements. - if do_updates and update_stmt and upsert_method in ("upsert", "update"): - logger.info(f" Updating {base_schema}.{table}") - logger.debug(f" UPDATE statement for {base_schema}.{table}") - logger.debug(f"{update_stmt.as_string(db.conn)}") - db.execute(update_stmt) - rows_updated = stg_rowcount - logger.info(f" {rows_updated} rows updated") - if do_inserts and insert_stmt and upsert_method in ("upsert", "insert"): - logger.info(f" Adding data to {base_schema}.{table}") - logger.debug(f" INSERT statement for {base_schema}.{table}") - logger.debug(f"{insert_stmt.as_string(db.conn)}") - db.execute(insert_stmt) - rows_inserted = new_rowcount - logger.info(f" {rows_inserted} rows inserted") - return rows_updated, rows_inserted - - -def error_handler(errors: list[str]): - """Log errors and exit.""" - for error in errors: - logger.error(error) - if errors: - db.rollback() - sys.exit(1) - - -def ellapsed_time(start_time: datetime): - """Returns a string representing the ellapsed time since the start time.""" - dt = (datetime.now() - start_time).total_seconds() - if dt < 60: - return f"{round((datetime.now() - start_time).total_seconds(), 3)} seconds" - if dt < 3600: - return f"{int(dt // 60)} minutes, {round(dt % 60, 3)} seconds" - return f"{int(dt // 3600)} hours, {int((dt % 3600)) // 60} minutes, {round(dt % 60, 3)} seconds" # noqa: E501 UP034 - - -def clparser() -> argparse.ArgumentParser: - """Command line interface for the upsert function.""" - parser = argparse.ArgumentParser( - description=description_short, - epilog=description_long, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "--version", - action="version", - version=f"%(prog)s {__version__}", - ) - parser.add_argument( - "-q", - "--quiet", - action="store_true", - help="suppress all console output", - ) - parser.add_argument( - "-d", - "--debug", - action="store_true", - help="display debug output", - ) - parser.add_argument( - "-l", - "--log", - metavar="LOGFILE", - help="write log to LOGFILE", - ) - parser.add_argument( - "-e", - "--exclude", - metavar="EXCLUDE_COLUMNS", - help="comma-separated list of columns to exclude from null checks", - ) - parser.add_argument( - "-n", - "--null", - metavar="NULL_COLUMNS", - help="comma-separated list of columns to exclude from null checks", - ) - parser.add_argument( - "-c", - "--commit", - action="store_true", - help="commit changes to database", - ) - parser.add_argument( - "-i", - "--interactive", - action="store_true", - help="display interactive GUI of important table information", - ) - parser.add_argument( - "-m", - "--method", - metavar="METHOD", - choices=["upsert", "update", "insert"], - help="method to use for upsert", - ) - parser.add_argument( - "host", - metavar="HOST", - help="database host", - ) - parser.add_argument( - "database", - metavar="DATABASE", - help="database name", - ) - parser.add_argument( - "user", - metavar="USER", - help="database user", - ) - parser.add_argument( - "stg_schema", - metavar="STAGING_SCHEMA", - help="staging schema name", - ) - parser.add_argument( - "base_schema", - metavar="BASE_SCHEMA", - help="base schema name", - ) - parser.add_argument( - "tables", - metavar="TABLE", - nargs="+", - help="table name(s)", - ) - return parser - - -def upsert( - host: str, - database: str, - user: str, - tables: list[str], - stg_schema: str, - base_schema: str, - upsert_method: str = "upsert", - commit: bool = False, - interactive: bool = False, - exclude_cols: list[str] | None = None, - exclude_null_check_columns: list[str] | None = None, - **kwargs, -): - """Upsert staging tables to base tables.""" - if exclude_null_check_columns is None: - exclude_null_check_columns = [] - if exclude_cols is None: - exclude_cols = [] - global db - global errors - global control_table - global timer - - errors = [] - control_table = "ups_control" - timer = datetime.now() - logger.debug(f"Starting upsert at {timer.strftime('%Y-%m-%d %H:%M:%S')}") - - db = PostgresDB( - host=host, - database=database, - user=user, - passwd=kwargs.get("passwd", None), - ) - logger.debug(f"Connected to {db}") - - validate_schemas(base_schema, stg_schema) - for table in tables: - validate_table(base_schema, stg_schema, table) - - logger.info(f"Upserting to {base_schema} from {stg_schema}") - if interactive: - btn, return_value = TableUI( - "Upsert Tables", - "Tables selected for upsert", - [ - ("Continue", 0, ""), - ("Cancel", 1, ""), - ], - ["Table"], - [[table] for table in tables], - ).activate() - if btn != 0: - error_handler(["Script canceled by user."]) - else: - logger.info("Tables selected for upsert:") - for table in tables: - logger.info(f" {table}") - - # Initialize the control table - logger.debug("Initializing control table") - staged_to_load(control_table, tables) - - # Update the control table with the list of columns to exclude from null checks - if exclude_cols: - db.execute( - SQL( - """ - update {control_table} - set exclude_cols = {exclude_cols}; - """, - ).format( - control_table=Identifier(control_table), - exclude_cols=Literal(",".join(exclude_cols)), - ), - ) - if exclude_null_check_columns: - db.execute( - SQL( - """ - update {control_table} - set exclude_null_checks = {exclude_null_check_columns}; - """, - ).format( - control_table=Identifier(control_table), - exclude_null_check_columns=Literal( - ",".join(exclude_null_check_columns), + constraint_name=Literal(rows["constraint_name"]), + table_schema=Literal(rows["table_schema"]), + table_name=Literal(rows["table_name"]), ), - ), - ) - if interactive: - db.execute( + ) + + # Update the control table with the number of rows failing the check constraint. + self.db.execute( SQL( """ - update {control_table} - set interactive = {interactive}; - """, - ).format( - control_table=Identifier(control_table), - interactive=Literal(interactive), - ), - ) - - # Run not-null, primary key, and foreign key QA checks on the staging tables - load_staging(base_schema, stg_schema, control_table) - - ctrl_df = db.dataframe( - SQL( - """ - select * from {control_table} - where - null_errors is not null - or pk_errors is not null - or fk_errors is not null - or ck_errors is not null; + create or replace temporary view ups_ck_error_list as + select string_agg( + constraint_name || ' (' || ckerror_values || ')', ', ' + ) as ck_errors + from ups_sel_cks + where coalesce(ckerror_values, 0) > 0; """, - ).format(control_table=Identifier(control_table)), - ) - - qa_pass = False - # if errors in control table - if not ctrl_df.is_empty(): - logger.debug("QA Errors:") - logger.debug( - tabulate( - ctrl_df.iter_rows(), - headers=ctrl_df.columns, - tablefmt="pipe", - showindex=False, - colalign=["left"] * len(ctrl_df.columns), ), ) - logger.debug("") - if interactive: - btn, return_value = TableUI( - "QA Errors", - "Below is a summary of errors.", - [ - ("Continue", 0, ""), - ("Cancel", 1, ""), - ], - ctrl_df.columns, - list(ctrl_df.iter_rows()), - ).activate() - error_handler(["QA checks failed. Aborting upsert."]) - else: - qa_pass = True - logger.info("===QA checks passed. Starting upsert===") - if qa_pass: - upsert_all(base_schema, stg_schema, control_table, upsert_method) - - final_ctrl_df = db.dataframe( - SQL("select * from {control_table};").format( - control_table=Identifier(control_table), - ), - ) - - if interactive: - btn, return_value = TableUI( - "Upsert Summary", - "Below is a summary of changes. Do you want to commit these changes? ", - [ - ("Continue", 0, ""), - ("Cancel", 1, ""), - ], - final_ctrl_df.columns, - list(final_ctrl_df.iter_rows()), - ).activate() - else: - btn = 0 + def upsert_all(self: PgUpsert) -> None: + if not self.qa_passed: + self.qa_all() + self.validate_control() + logger.info("===Starting upsert procedures===") - logger.info("") - if btn == 0: - if final_ctrl_df.filter( - (pl.col("rows_updated") > 0) | (pl.col("rows_inserted") > 0), - ).is_empty(): - logger.info("No changes to commit") - db.rollback() - else: - if commit: - logger.info("Changes committed") - db.commit() - else: - logger.info( - f"Commit set to {str(commit).upper()}, rolling back changes", - ) - db.rollback() - else: - logger.info("Rolling back changes") - db.rollback() - - logger.debug(f"Upsert completed in {ellapsed_time(timer)}") - - -def main() -> None: +def cli() -> None: """Main command line entrypoint for the upsert function.""" args = clparser().parse_args() - logging.basicConfig( - level=logging.INFO if not args.debug else logging.DEBUG, - format="%(message)s", - handlers=[ - logging.StreamHandler() if not args.quiet else logging.NullHandler(), - logging.FileHandler(Path(args.log)) if args.log else logging.NullHandler(), - ], - ) - upsert( + if args.log and args.log.exists(): + args.log.unlink() + if not args.quiet: + logger.addHandler(logging.StreamHandler()) + if args.log: + logger.addHandler(logging.FileHandler(args.log)) + if args.debug: + logger.setLevel(logging.DEBUG) + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(lineno)d - %(message)s", + ) + for handler in logger.handlers: + handler.setFormatter(formatter) + PgUpsert( host=args.host, + port=args.port, database=args.database, user=args.user, tables=args.tables, stg_schema=args.stg_schema, base_schema=args.base_schema, - commit=args.commit, - upsert_method=args.method, + do_commit=args.do_commit, + upsert_method=args.upsert_method, interactive=args.interactive, exclude_cols=args.exclude.split(",") if args.exclude else None, exclude_null_check_columns=args.null.split(",") if args.null else None, - ) + ).run() if __name__ == "__main__": - main() + cli() diff --git a/pyproject.toml b/pyproject.toml index 6aa2dd4..6c033b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ exclude = [ "*cache*", ] # The line length to use when enforcing long-lines violations (like E501). -line-length = 88 +line-length = 120 # Assume Python 3.11. target-version = "py311" # Whether to automatically exclude files that are ignored by .ignore, .gitignore, .git/info/exclude, and global gitignore files. @@ -115,4 +115,4 @@ commit_args = "--no-verify" tag = true [[tool.bumpversion.files]] -filename = "pg_upsert/pg_upsert.py" +filename = "pg_upsert/__init__.py" diff --git a/requirements.txt b/requirements.txt index ac49147..40b3016 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,39 @@ +accessible-pygments==0.0.5 +alabaster==0.7.16 annotated-types==0.7.0 +Babel==2.15.0 +backports.tarfile==1.2.0 +beautifulsoup4==4.12.3 bracex==2.4 build==1.2.1 bump-my-version==0.21.1 +certifi==2024.7.4 cfgv==3.4.0 +charset-normalizer==3.3.2 click==8.1.7 coverage==7.6.0 distlib==0.3.8 +docutils==0.21.2 filelock==3.14.0 identify==2.5.36 +idna==3.7 +imagesize==1.4.1 +importlib_metadata==8.0.0 iniconfig==2.0.0 +jaraco.classes==3.4.0 +jaraco.context==5.3.0 +jaraco.functools==4.0.1 +Jinja2==3.1.4 +keyring==25.2.1 markdown-it-py==3.0.0 +MarkupSafe==2.1.5 mdurl==0.1.2 +more-itertools==10.3.0 +nh3==0.2.18 nodeenv==1.8.0 packaging==24.1 +pg_upsert @ file:///Users/cgrant/GitHub/geocoug/pg_upsert +pkginfo==1.10.0 platformdirs==4.2.2 pluggy==1.5.0 polars==1.2.0 @@ -22,6 +43,7 @@ psycopg2-binary==2.9.9 pydantic==2.7.3 pydantic-settings==2.3.1 pydantic_core==2.18.4 +pydata-sphinx-theme==0.15.4 Pygments==2.18.0 pyproject_hooks==1.1.0 pytest==8.2.2 @@ -29,12 +51,29 @@ pytest-cov==5.0.0 python-dotenv==1.0.1 PyYAML==6.0.1 questionary==2.0.1 +readme_renderer==44.0 +requests==2.32.3 +requests-toolbelt==1.0.0 +rfc3986==2.0.0 rich==13.7.1 rich-click==1.8.3 ruff==0.5.2 +snowballstemmer==2.2.0 +soupsieve==2.5 +Sphinx==7.4.6 +sphinx-book-theme==1.1.3 +sphinxcontrib-applehelp==1.0.8 +sphinxcontrib-devhelp==1.0.6 +sphinxcontrib-htmlhelp==2.0.5 +sphinxcontrib-jsmath==1.0.1 +sphinxcontrib-qthelp==1.0.7 +sphinxcontrib-serializinghtml==1.1.10 tabulate==0.9.0 tomlkit==0.12.5 +twine==5.1.1 typing_extensions==4.12.2 +urllib3==2.2.2 virtualenv==20.26.2 wcmatch==8.5.2 wcwidth==0.2.13 +zipp==3.19.2 diff --git a/tests/data.sql b/tests/data.sql index 3b5825f..8643381 100644 --- a/tests/data.sql +++ b/tests/data.sql @@ -10,7 +10,7 @@ create table public.books ( book_title varchar(200) not null, genre varchar(100) not null, notes text, - foreign key (genre) references genres(genre) + foreign key (genre) references public.genres(genre) ); drop table if exists public.authors cascade; @@ -27,8 +27,8 @@ drop table if exists public.book_authors cascade; create table public.book_authors ( book_id varchar(100) not null, author_id varchar(100) not null, - foreign key (author_id) references authors(author_id), - foreign key (book_id) references books(book_id), + foreign key (author_id) references public.authors(author_id), + foreign key (book_id) references public.books(book_id), constraint pk_book_authors primary key (book_id, author_id) );