From 9d3c34317bcb75d74d7edeac22fad66ca7e0d2d7 Mon Sep 17 00:00:00 2001 From: Caleb Grant Date: Fri, 19 Jul 2024 16:06:39 -0700 Subject: [PATCH] continue refactoring, + docs --- Makefile | 1 - README.md | 19 + docs/conf.py | 2 +- docs/index.rst | 22 +- docs/modules.rst | 7 - docs/pg_upsert.rst | 19 +- example.py | 103 +++ pg_upsert/__init__.py | 14 +- pg_upsert/_version.py | 6 + pg_upsert/pg_upsert.py | 1535 ++++++++++++++++++++++++++++++---------- pyproject.toml | 8 +- requirements.txt | 5 +- 12 files changed, 1301 insertions(+), 440 deletions(-) delete mode 100644 docs/modules.rst create mode 100644 example.py diff --git a/Makefile b/Makefile index aad9b5b..487767a 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,6 @@ build-dist: $(VENV)/bin/activate ## Generate distrubition packages build-docs: ## Generate documentation @printf "Building documentation\n" - @$(SPHINXAPIDOC) -f -o "$(SOURCEDIR)" pg_upsert @$(SPHINXBUILD) -M html "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) publish: $(VENV)/bin/activate ## Publish to PyPI diff --git a/README.md b/README.md index 615a5ac..93def82 100644 --- a/README.md +++ b/README.md @@ -398,3 +398,22 @@ POSTGRES_PASSWORD= ``` Now you can run the tests using `make test`. + +## Notes + +- The user can modify the control table to set interactive specific to each table +- The user can modify the control table to set the upsert method specific to each table +- The user can modify the control table to set the exclude columns specific to each table +- The user can modify the control table to set the exclude null check columns specific to each table +- In upsert_one check that the table has a primary key before proceeding +- Replace all sys.exit() with a graceful exit that closes db connection and rolls back changes + +upsert_all(): + +- What would happen if the user runs this method without running the QA checks first? +- What would happen if the user modified the qa_passed attribute to True without running the QA checks? +- What would happen if the user modified the control table to indicate that QA checks passed when they did not? + +TODO: +- Modify the show() funciton to acutally show a query either via GUI or console +- Fix upsert_one to return self diff --git a/docs/conf.py b/docs/conf.py index 356bb67..9f2465e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -42,7 +42,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -html_theme = "sphinx_book_theme" +html_theme = "sphinx_rtd_theme" html_static_path = ["_static"] diff --git a/docs/index.rst b/docs/index.rst index 9d423b3..cdc7299 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,26 +6,12 @@ pg_upsert documentation ======================= -Release v\ |version|. - **pg_upsert** is a Python package that runs not-NULL, Primary Key, Foreign Key, and Check Constraint checks on PostgreSQL staging tables then update and insert (upsert) data from staging tables to base tables. -The API Documentation / Guide ------------------------------ - -If you are looking for information on a specific function, class, or method, -this part of the documentation is for you. +Module Contents +--------------- .. toctree:: - :maxdepth: 2 - :caption: Contents: - - modules - - -Indices and tables -================== + :maxdepth: 4 -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` + pg_upsert diff --git a/docs/modules.rst b/docs/modules.rst deleted file mode 100644 index fee14af..0000000 --- a/docs/modules.rst +++ /dev/null @@ -1,7 +0,0 @@ -pg_upsert -========= - -.. toctree:: - :maxdepth: 4 - - pg_upsert diff --git a/docs/pg_upsert.rst b/docs/pg_upsert.rst index 3f77a58..d0830ae 100644 --- a/docs/pg_upsert.rst +++ b/docs/pg_upsert.rst @@ -1,19 +1,10 @@ -pg\_upsert package -================== +.. _pg_upsert: -Submodules ----------- +pg\_upsert +========== -pg\_upsert.pg\_upsert module ----------------------------- - -.. automodule:: pg_upsert.pg_upsert - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- +All of pg_upsert's functionality can be accessed by the :class:`PgUpsert` object, which +includes all the methods and attributes mentioned in the sections below. .. automodule:: pg_upsert :members: diff --git a/example.py b/example.py new file mode 100644 index 0000000..7af3694 --- /dev/null +++ b/example.py @@ -0,0 +1,103 @@ +import logging + +from pg_upsert import PgUpsert + +logger = logging.getLogger("pg_upsert") +logger.setLevel(logging.INFO) +handlers = [ + logging.FileHandler("pg_upsert.log"), + logging.StreamHandler(), +] +if logger.level == logging.DEBUG: + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(lineno)d: %(message)s" + ) +else: + formatter = logging.Formatter("%(message)s") +for handler in handlers: + handler.setFormatter(formatter) + logger.addHandler(handler) + +# Full example of instantiating the class and running the upserts +upsert = PgUpsert( + host="localhost", + port=5432, + database="dev", + user="docker", + passwd="docker", + tables=("genres", "books", "authors", "book_authors"), + stg_schema="staging", + base_schema="public", + do_commit=False, + upsert_method="insert", + interactive=False, + exclude_cols=None, + exclude_null_check_columns=None, + control_table="ups_control", +).run() + + +# Minimal example of instantiating the class and running the upserts +# upsert = PgUpsert( +# host="localhost", +# port=5432, +# database="dev", +# user="docker", +# passwd="docker", +# tables=("genres", "books", "authors", "book_authors"), +# stg_schema="staging", +# base_schema="public", +# ) + +# upsert.run() + +# # Modify the control table then run run upsert on the one table +# upsert.db.execute( +# f"update {upsert.control_table} set exclude_cols = 'first_name,last_name', interactive=true where table_name = 'authors';" +# ) +# upsert.upsert_one(table="authors").commit() + + +# # Run upsert on one table +# upsert.upsert_one(table="authors").commit() + + +# # Run a specific set of qa checks on one table +# # Null checks +# upsert.qa_one_null("authors") +# # Primary key checks +# upsert.qa_one_pk("authors") +# # Foreign key checks +# upsert.qa_one_fk("authors") +# # Check constraint checks +# upsert.qa_one_ck("authors") + + +# # Run everything with defaults +# upsert.run() + + +# # Run only QA checks +# upsert.qa_all() + + +# # Run only upserts and commit changes (if do_commit=True) +# upsert.upsert_all().commit() + + +# # Run only upserts and do not commit changes +# upsert.upsert_all() + + +# # Modify the control table on a table-by-table basis +# # The control table is initialized when the class is instantiated +# logger.info(upsert.show(f"select * from {upsert.control_table}")) +# logger.info("") +# # Modify the exclude_cols column for the authors table and set interactive to true. The exclude_cols and exclude_null_checks values should be a comma-separated string. +# upsert.db.execute( +# f"update {upsert.control_table} set exclude_cols = 'first_name,last_name', interactive=true where table_name = 'authors';" +# ) +# logger.info(upsert.show(f"select * from {upsert.control_table}")) + + +del upsert diff --git a/pg_upsert/__init__.py b/pg_upsert/__init__.py index c6f4e49..9984284 100644 --- a/pg_upsert/__init__.py +++ b/pg_upsert/__init__.py @@ -1,4 +1,12 @@ -from ._version import __version__ -from .pg_upsert import PgUpsert, PostgresDB +from ._version import ( + __author__, + __author_email__, + __description__, + __license__, + __title__, + __url__, + __version__, +) +from .pg_upsert import PgUpsert -__all__ = ["PgUpsert", "PostgresDB"] +__all__ = ["PgUpsert"] diff --git a/pg_upsert/_version.py b/pg_upsert/_version.py index c72e379..7f36af0 100644 --- a/pg_upsert/_version.py +++ b/pg_upsert/_version.py @@ -1 +1,7 @@ +__title__ = "pg_upsert" +__author__ = "Caleb Grant" +__url__ = "https://github.com/geocoug/pg_upsert" +__author_email__ = "grantcaleb22@gmail.com" +__license__ = "GNU GPLv3" __version__ = "1.1.4" +__description__ = "Run not-NULL, Primary Key, Foreign Key, and Check Constraint checks on staging tables then update and insert (upsert) data from staging tables to base tables." # noqa: E501 diff --git a/pg_upsert/pg_upsert.py b/pg_upsert/pg_upsert.py index c120591..757ef7e 100644 --- a/pg_upsert/pg_upsert.py +++ b/pg_upsert/pg_upsert.py @@ -18,9 +18,7 @@ from psycopg2.sql import SQL, Composable, Identifier, Literal from tabulate import tabulate -from ._version import __version__ - -description = "Run not-NULL, Primary Key, Foreign Key, and Check Constraint checks on staging tables then update and insert (upsert) data from staging tables to base tables." +from ._version import __description__, __version__ logging.basicConfig( level=logging.INFO, @@ -29,8 +27,6 @@ ) logger = logging.getLogger(__name__) -# Get the __version__ from the __init__.py file. - class PostgresDB: """Base database object.""" @@ -59,7 +55,7 @@ def __init__( def __repr__(self: PostgresDB) -> str: return ( - f"{self.__class__.__name__}(host={self.host}, port={self.port}, database={self.database}, user={self.user})" # noqa: E501 + f"{self.__class__.__name__}(host={self.host}, port={self.port}, database={self.database}, user={self.user})" ) def __del__(self: PostgresDB) -> None: @@ -523,243 +519,88 @@ def click(self: ClickSet, *args): self.ui_obj.win.destroy() -def treeview_table( - parent: ttk.Frame, - rowset: list | tuple, - column_headers: list | tuple, - select_mode="none", -): - """Creates a TreeView table containing the specified data, with scrollbars and - status bar in an enclosing frame. - This does not grid the table frame in its parent widget. Returns a tuple - of 0: the frame containing the table, and 1: the table widget itself. +class PgUpsert: """ - nrows = range(len(rowset)) - ncols = range(len(column_headers)) - hdrwidths = [len(column_headers[j]) for j in ncols] - if len(rowset) > 0: - datawidthtbl = [ - [ - len( - (rowset[i][j] if isinstance(rowset[i][j], str) else str(rowset[i][j])), - ) - for i in nrows - ] - for j in ncols - ] - datawidths = [max(cwidths) for cwidths in datawidthtbl] - else: - datawidths = hdrwidths - colwidths = [max(hdrwidths[i], datawidths[i]) for i in ncols] - # Set the font. - ff = tkfont.nametofont("TkFixedFont") - tblstyle = ttk.Style() - tblstyle.configure("tblstyle", font=ff) - charpixels = int(1.3 * ff.measure("0")) - tableframe = ttk.Frame(master=parent, padding="3 3 3 3") - statusframe = ttk.Frame(master=tableframe) - # Create and configure the Treeview table widget - tv_widget = ttk.Treeview( - tableframe, - columns=column_headers, - selectmode=select_mode, - show="headings", - ) - tv_widget.configure()["style"] = tblstyle - ysb = ttk.Scrollbar(tableframe, orient="vertical", command=tv_widget.yview) - xsb = ttk.Scrollbar(tableframe, orient="horizontal", command=tv_widget.xview) - tv_widget.configure(yscrollcommand=ysb.set, xscrollcommand=xsb.set) - # Status bar - statusbar = ttk.Label( - statusframe, - text=" %d rows" % len(rowset), - relief=tk.RIDGE, - anchor=tk.W, - ) - tableframe.statuslabel = statusbar - # Fill the Treeview table widget with data - set_tv_headers(tv_widget, column_headers, colwidths, charpixels) - fill_tv_table(tv_widget, rowset, statusbar) - # Place the table - tv_widget.grid(column=0, row=0, sticky=tk.NSEW) - ysb.grid(column=1, row=0, sticky=tk.NS) - xsb.grid(column=0, row=1, sticky=tk.EW) - statusframe.grid(column=0, row=3, sticky=tk.EW) - tableframe.columnconfigure(0, weight=1) - tableframe.rowconfigure(0, weight=1) - # Place the status bar - statusbar.pack(side=tk.BOTTOM, fill=tk.X) - # Allow resizing of the table - tableframe.columnconfigure(0, weight=1) - tableframe.rowconfigure(0, weight=1) - # - return tableframe, tv_widget - - -def set_tv_headers( - tvtable: ttk.Treeview, - column_headers: list, - colwidths: list, - charpixels: int, -): - """Set the headers and column widths for a Treeview table widget.""" - pixwidths = [charpixels * col for col in colwidths] - for i in range(len(column_headers)): - hdr = column_headers[i] - tvtable.column(hdr, width=pixwidths[i]) - tvtable.heading( - hdr, - text=hdr, - command=lambda _col=hdr: treeview_sort_column(tvtable, _col, False), + Perform one or all of the following operations on a set of PostgreSQL tables: + + - Perform QA checks on data in a staging table or set of staging tables. QA checks include not-null, primary key, foreign key, and check constraint checks. + - Perform updates and inserts (upserts) on a base table or set of base tables from the staging table(s) of the same name. + + PgUpsert utilizes temporary tables and views inside the PostgreSQL database to dynamically + generate SQL for QA checks and upserts. All temporary objects are initialized with the `ups_` prefix. + + The upsert process is transactional. If any part of the process fails, the transaction will be rolled back. + Committing changes to the database is optional and can be controlled with the `do_commit` flag. + + To avoid SQL injection, all SQL statements are generated using the `psycopg2.sql`_ module. + + :param host: Name of the PostgreSQL host. + :type host: str + :param database: Name of the PostgreSQL database. + :type database: str + :param user: Name of the PostgreSQL user. This user must have the necessary permissions to + connect to the database, query the information_schema, create temporary objects, + select from the staging tables, and update and insert into the base tables. + No checking is done to verify these permissions. + :type user: str + :param port: PostgreSQL database port, defaults to 5432. + :type port: int, optional + :param passwd: Password for the PostgreSQL user. If None, the user will be prompted to enter + the password. Defaults to None. + :type passwd: None or str, optional + :param tables: List of table names to perform QA checks on and upsert. Defaults to (). + :type tables: list or tuple or None, optional + :param stg_schema: Name of the staging schema where tables are located which will be used for + QA checks and upserts. Tables in the staging schema must have the same name + as the tables in the base schema that they will be upserted to. Defaults to None. + :type stg_schema: str or None, optional + :param base_schema: Name of the base schema where tables are located which will be updated or + inserted into. Defaults to None. + :type base_schema: str or None, optional + :param do_commit: If True, changes will be committed to the database once the upsert process + is complete. If False, changes will be rolled back. Defaults to False. + :type do_commit: bool, optional + :param interactive: If True, the user will be prompted with multiple dialogs to confirm various + steps during the upsert process. If False, the upsert process will run + without user intervention. Defaults to False. + :type interactive: bool, optional + :param upsert_method: The method to use for upserting data. Must be one of "upsert", "update", + or "insert". Defaults to "upsert". + :type upsert_method: str, optional + :param exclude_cols: List of column names to exclude from the upsert process. These columns will + not be updated or inserted to, however, they will still be checked during + the QA process. + :type exclude_cols: list or tuple or None, optional + :param exclude_null_check_columns: List of column names to exclude from the not-null check during + the QA process. Defaults to (). + :type exclude_null_check_columns: list or tuple or None, optional + :param control_table: Name of the temporary control table that will be used to track changes + during the upsert process. Defaults to "ups_control". + :type control_table: str, optional + + :example: + + .. code-block:: python + + from pg_upsert import PgUpsert + + PgUpsert( + host="localhost", + port=5432, + database="postgres", + user="username", + tables=("genres", "books", "authors", "book_authors"), + stg_schema="staging", + base_schema="public", + do_commit=False, + upsert_method="upsert", + interactive=False, + exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"), + exclude_null_check_columns=("rev_user", "rev_time", "created_at", "updated_at", "alias"), ) - -def treeview_sort_column(tv: ttk.Treeview, col: str, reverse: bool): - """Sort a column in a Treeview table widget. - - From https://stackoverflow.com/questions/1966929/tk-treeview-column-sort#1967793 + .. _psycopg2.sql: https://www.psycopg.org/docs/sql.html """ - colvals = [(tv.set(k, col), k) for k in tv.get_children()] - colvals.sort(reverse=reverse) - # Rearrange items in sorted positions - for index, (_val, k) in enumerate(colvals): - tv.move(k, "", index) - # Reverse sort next time - tv.heading(col, command=lambda: treeview_sort_column(tv, col, not reverse)) - - -def fill_tv_table(tvtable: ttk.Treeview, rowset: list | tuple, status_label=None): - """Fill a Treeview table widget with data.""" - for i, row in enumerate(rowset): - enc_row = [c if c is not None else "" for c in row] - tvtable.insert(parent="", index="end", iid=str(i), values=enc_row) - if status_label is not None: - status_label.config(text=" %d rows" % len(rowset)) - - -# def error_handler(errors: list[str]): -# """Log errors and exit.""" -# for error in errors: -# logger.error(error) -# if errors: -# db.rollback() -# sys.exit(1) - - -def ellapsed_time(start_time: datetime): - """Returns a string representing the ellapsed time since the start time.""" - dt = (datetime.now() - start_time).total_seconds() - if dt < 60: - return f"{round((datetime.now() - start_time).total_seconds(), 3)} seconds" - if dt < 3600: - return f"{int(dt // 60)} minutes, {round(dt % 60, 3)} seconds" - return f"{int(dt // 3600)} hours, {int((dt % 3600)) // 60} minutes, {round(dt % 60, 3)} seconds" # noqa: UP034 - - -def clparser() -> argparse.ArgumentParser: - """Command line interface for the upsert function.""" - parser = argparse.ArgumentParser( - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "--version", - action="version", - version=f"%(prog)s {__version__}", - ) - parser.add_argument( - "-q", - "--quiet", - action="store_true", - help="suppress all console output", - ) - parser.add_argument( - "-d", - "--debug", - action="store_true", - help="display debug output", - ) - parser.add_argument( - "-l", - "--log", - type=Path, - metavar="LOGFILE", - help="write log to LOGFILE", - ) - parser.add_argument( - "-e", - "--exclude", - metavar="EXCLUDE_COLUMNS", - help="comma-separated list of columns to exclude from null checks", - ) - parser.add_argument( - "-n", - "--null", - metavar="NULL_COLUMNS", - help="comma-separated list of columns to exclude from null checks", - ) - parser.add_argument( - "-c", - "--do-commit", - action="store_true", - help="commit changes to database", - ) - parser.add_argument( - "-i", - "--interactive", - action="store_true", - help="display interactive GUI of important table information", - ) - parser.add_argument( - "-m", - "--upsert-method", - metavar="UPSERT_METHOD", - default="upsert", - choices=["upsert", "update", "insert"], - help="method to use for upsert", - ) - parser.add_argument( - "host", - metavar="HOST", - help="database host", - ) - parser.add_argument( - "port", - metavar="PORT", - type=int, - help="database port", - ) - parser.add_argument( - "database", - metavar="DATABASE", - help="database name", - ) - parser.add_argument( - "user", - metavar="USER", - help="database user", - ) - parser.add_argument( - "stg_schema", - metavar="STAGING_SCHEMA", - help="staging schema name", - ) - parser.add_argument( - "base_schema", - metavar="BASE_SCHEMA", - help="base schema name", - ) - parser.add_argument( - "tables", - metavar="TABLE", - nargs="+", - help="table name(s)", - ) - return parser - - -class PgUpsert: - UPSERT_METHODS = ("upsert", "update", "insert") def __init__( self, @@ -776,16 +617,19 @@ def __init__( upsert_method: str = "upsert", exclude_cols: list | tuple | None = (), exclude_null_check_columns: list | tuple | None = (), - **kwargs, + control_table: str = "ups_control", ): - if upsert_method not in self.UPSERT_METHODS: + if upsert_method not in self._upsert_methods(): raise ValueError( - f"Invalid upsert method: {upsert_method}. Must be one of {self.UPSERT_METHODS}", + f"Invalid upsert method: {upsert_method}. Must be one of {self._upsert_methods()}", ) - if not base_schema: - raise ValueError("No base schema specified") - if not stg_schema: - raise ValueError("No staging schema specified") + if not base_schema or not stg_schema: + if not base_schema and not stg_schema: + raise ValueError("No base or staging schema specified") + if not base_schema: + raise ValueError("No base schema specified") + if not stg_schema: + raise ValueError("No staging schema specified") if not tables: raise ValueError("No tables specified") if stg_schema == base_schema: @@ -808,16 +652,22 @@ def __init__( self.upsert_method = upsert_method self.exclude_cols = exclude_cols self.exclude_null_check_columns = exclude_null_check_columns - self.control_table = kwargs.get("control_table", "ups_control") + self.control_table = control_table self.qa_passed = False + self._validate_schemas() + for table in self.tables: + self._validate_table(table) + self._init_ups_control() + + @staticmethod + def _upsert_methods() -> tuple[str, str, str]: + """Return a tuple of valid upsert methods.""" + return ("upsert", "update", "insert") def __repr__(self): return f"{self.__class__.__name__}(db={self.db!r}, tables={self.tables}, stg_schema={self.stg_schema}, base_schema={self.base_schema}, do_commit={self.do_commit}, interactive={self.interactive}, upsert_method={self.upsert_method}, exclude_cols={self.exclude_cols}, exclude_null_check_columns={self.exclude_null_check_columns})" # noqa: E501 - def __del__(self): - self.db.close() - - def show_results(self, sql: str | Composable) -> None | str: + def _show(self, sql: str | Composable) -> None | str: """Display the results of a query in a table format. If the interactive flag is set, the results will be displayed in a Tkinter window. Otherwise, the results will be displayed in the console using the tabulate module.""" @@ -825,48 +675,10 @@ def show_results(self, sql: str | Composable) -> None | str: if rowcount == 0: logger.info("No results found") return None - return f"{tabulate(rows, headers='keys', tablefmt='pipe', showindex=False)}" + return f"{tabulate(rows, headers='keys', tablefmt='github', showindex=False)}" - def run(self) -> None: - """Run the upsert process.""" - self.validate_schemas() - for table in self.tables: - self.validate_table(table) - logger.info(f"Upserting to {self.base_schema} from {self.stg_schema}") - if self.interactive: - logger.debug("Tables selected for upsert:") - for table in self.tables: - logger.debug(f" {table}") - btn, return_value = TableUI( - "Upsert Tables", - "Tables selected for upsert", - [ - ("Continue", 0, ""), - ("Cancel", 1, ""), - ], - ["Table"], - [[table] for table in self.tables], - ).activate() - if btn != 0: - logger.info("Upsert cancelled") - return - else: - logger.info("Tables selected for upsert:") - for table in self.tables: - logger.info(f" {table}") - self.init_ups_control() - self.qa_all() - if self.qa_passed: - self.upsert_all() - - self.db.close() - - def validate_schemas(self: PgUpsert) -> None: - """Validate that the base and staging schemas exist. - - Raises: - ValueError: If either schema does not exist. - """ + def _validate_schemas(self: PgUpsert) -> None: + """Validate that the base and staging schemas exist.""" logger.debug(f"Validating schemas {self.base_schema} and {self.stg_schema}") sql = SQL( """ @@ -902,17 +714,14 @@ def validate_schemas(self: PgUpsert) -> None: f"Invalid schema(s): {next(iter(self.db.rowdict(sql)[0]))['schema_string']}", ) - def validate_table(self, table: str) -> None: + def _validate_table(self, table: str) -> None: """Utility script to validate one table in both base and staging schema. Halts script processing if any either of the schemas are non-existent, or if either of the tables are not present within those schemas pass. - Args: - table (str): The table name to validate. - - Raises: - ValueError: If the table does not exist in either the base or staging schema. + :param table: The table to validate. + :type table: str """ logger.debug( f"Validating table {table} exists in {self.base_schema} and {self.stg_schema} schemas", @@ -950,10 +759,33 @@ def validate_table(self, table: str) -> None: f"Invalid table(s): {next(iter(self.db.rowdict(sql)[0]))['schema_table']}", ) - def validate_control(self: PgUpsert) -> None: - """Validate contents of control table against base and staging schema.""" + def _validate_control(self: PgUpsert) -> None: + """Validate contents of control table against base and staging schema. + + :objects created: + + - `ups_validate_control`: Temporary table containing the results of the validation. + - `ups_ctrl_invl_table`: Temporary table containing the names of invalid tables. + """ logger.debug("Validating control table") - self.validate_schemas() + self._validate_schemas() + # Check if the control table exists + if ( + self.db.execute( + SQL( + """ + select 1 + from information_schema.tables + where table_name = {control_table} + """, + ).format( + base_schema=Literal(self.base_schema), + control_table=Literal(self.control_table), + ), + ).rowcount + == 0 + ): + self._init_ups_control() sql = SQL( """ drop table if exists ups_validate_control cascade; @@ -1006,9 +838,13 @@ def validate_control(self: PgUpsert) -> None: for row in rows: logger.error(f" {row['schema_table']}") - def init_ups_control(self: PgUpsert) -> None: + def _init_ups_control(self: PgUpsert) -> None: """Creates a table having the structure that is used to drive the upsert operation on multiple staging tables. + + :objects created: + + - `ups_control`: Temporary table containing the control data. """ logger.debug("Initializing upsert control table") sql = SQL( @@ -1036,7 +872,7 @@ def init_ups_control(self: PgUpsert) -> None: tables=Literal(",".join(self.tables)), ) self.db.execute(sql) - # Update the control table with the list of columns to exclude from null checks + # Update the control table with the list of columns to exclude from being updated or inserted to. if self.exclude_cols and len(self.exclude_cols) > 0: self.db.execute( SQL( @@ -1049,6 +885,7 @@ def init_ups_control(self: PgUpsert) -> None: exclude_cols=Literal(",".join(self.exclude_cols)), ), ) + # Update the control table with the list of columns to exclude from null checks. if self.exclude_null_check_columns and len(self.exclude_null_check_columns) > 0: self.db.execute( SQL( @@ -1075,16 +912,14 @@ def init_ups_control(self: PgUpsert) -> None: interactive=Literal(self.interactive), ), ) - rows, headers, rowcount = self.db.rowdict( - SQL("select * from {control_table}").format( - control_table=Identifier(self.control_table), - ), + debug_sql = SQL("select * from {control_table}").format( + control_table=Identifier(self.control_table), ) logger.debug( - f"Control table after being initialized:\n{tabulate(rows, headers='keys', tablefmt='pipe', showindex=False)}", + f"Control table after being initialized:\n{self._show(debug_sql)}", ) - def qa_all(self: PgUpsert) -> None: + def qa_all(self: PgUpsert) -> PgUpsert: """Performs QA checks for nulls in non-null columns, for duplicated primary key values, for invalid foreign keys, and invalid check constraints in a set of staging tables to be loaded into base tables. @@ -1103,7 +938,20 @@ def qa_all(self: PgUpsert) -> None: base table that are also in the staging table are updated. The update operation does not test to see if column contents are different, and so does not update only those values that are different. + + This method runs :class:`PgUpsert` methods in the following order: + + 1. :meth:`PgUpsert.qa_all_null` + 2. :meth:`PgUpsert.qa_all_pk` + 3. :meth:`PgUpsert.qa_all_fk` + 4. :meth:`PgUpsert.qa_all_ck` + + :objects created: + + - `ups_proctables`: Temporary table containing the list of tables to process. + - `ups_toprocess`: Temporary view returning a single unprocessed table. """ + self._validate_control() # Clear the columns of return values from the control table, # in case this control table has been used previously. self.db.execute( @@ -1168,7 +1016,7 @@ def qa_all(self: PgUpsert) -> None: control_table=Identifier(self.control_table), ) if not self.interactive: - logger.debug(f"\n{self.show_results(ctrl)}") + logger.debug(f"\n{self._show(ctrl)}") # Reset the loop control flag in the control table. self.db.execute(SQL("update ups_proctables set processed = False;")) @@ -1187,7 +1035,7 @@ def qa_all(self: PgUpsert) -> None: control_table=Identifier(self.control_table), ) logger.debug("QA checks failed") - logger.debug(f"\n{self.show_results(ctrl)}") + logger.debug(f"\n{self._show(ctrl)}") logger.debug("") if self.interactive: btn, return_value = TableUI( @@ -1198,15 +1046,16 @@ def qa_all(self: PgUpsert) -> None: ("Cancel", 1, ""), ], headers, - list(rows), + [[row[header] for header in headers] for row in rows], ).activate() else: logger.error("===QA checks failed. Below is a summary of the errors===") - logger.error(self.show_results(ctrl)) - return + logger.error(self._show(ctrl)) + return self self.qa_passed = True + return self - def qa_all_null(self: PgUpsert) -> None: + def qa_all_null(self: PgUpsert) -> PgUpsert: """Performs null checks for non-null columns in selected staging tables.""" while True: rows, headers, rowcount = self.db.rowdict( @@ -1216,24 +1065,6 @@ def qa_all_null(self: PgUpsert) -> None: break rows = next(iter(rows)) self.qa_one_null(table=rows["table_name"]) - # Query the ups_null_error_list control table for the null errors. - err_rows, err_headers, err_rowcount = self.db.rowdict( - SQL("select * from ups_null_error_list;"), - ) - if err_rowcount > 0: - self.db.execute( - SQL( - """ - update {control_table} - set null_errors = {null_errors} - where table_name = {table_name}; - """, - ).format( - control_table=Identifier(self.control_table), - null_errors=Literal(next(iter(err_rows))["null_errors"]), - table_name=Literal(rows["table_name"]), - ), - ) # Set the 'processed' column to True in the control table. self.db.execute( SQL( @@ -1244,13 +1075,24 @@ def qa_all_null(self: PgUpsert) -> None: """, ).format(table_name=Literal(rows["table_name"])), ) + return self + + def qa_one_null(self: PgUpsert, table: str) -> PgUpsert: + """Performs null checks for non-null columns in a single staging table. - def qa_one_null(self: PgUpsert, table: str) -> None: - """Performs null checks for non-null columns in a single staging table.""" + :param table: The name of the staging table to check for null values. + :type table: str + + :objects created: + + - `ups_nonnull_cols`: Temporary table containing the non-null columns of the base table. + - `ups_qa_nonnull_col`: Temporary view containing the number of rows with nulls in the staging table. + - `ups_null_error_list`: Temporary view containing the list of null errors. + """ logger.info( - f"Checking for NULLs in non-NULL columns in table {self.stg_schema}.{table}", + f"Conducting not-null QA checks on table {self.stg_schema}.{table}", ) - self.validate_table(table) + self._validate_table(table) # Create a table listing the columns of the base table that must # be non-null and that do not have a default expression. # Include a column for the number of rows with nulls in the staging table. @@ -1350,8 +1192,27 @@ def qa_one_null(self: PgUpsert, table: str) -> None: where coalesce(null_rows, 0) > 0; """, ) + # Query the ups_null_error_list control table for the null errors. + err_rows, err_headers, err_rowcount = self.db.rowdict( + SQL("select * from ups_null_error_list;"), + ) + if err_rowcount > 0: + self.db.execute( + SQL( + """ + update {control_table} + set null_errors = {null_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + null_errors=Literal(next(iter(err_rows))["null_errors"]), + table_name=Literal(table), + ), + ) + return self - def qa_all_pk(self: PgUpsert) -> None: + def qa_all_pk(self: PgUpsert) -> PgUpsert: """Performs primary key checks for duplicated primary key values in selected staging tables.""" while True: rows, headers, rowcount = self.db.rowdict( @@ -1360,21 +1221,7 @@ def qa_all_pk(self: PgUpsert) -> None: if rowcount == 0: break rows = next(iter(rows)) - pk_errors = self.qa_one_pk(table=rows["table_name"]) - if pk_errors: - self.db.execute( - SQL( - """ - update {control_table} - set pk_errors = {pk_errors} - where table_name = {table_name}; - """, - ).format( - control_table=Identifier(self.control_table), - pk_errors=Literal(",".join(pk_errors)), - table_name=Literal(rows["table_name"]), - ), - ) + self.qa_one_pk(table=rows["table_name"]) # Set the 'processed' column to True in the control table. self.db.execute( SQL( @@ -1385,14 +1232,24 @@ def qa_all_pk(self: PgUpsert) -> None: """, ).format(table_name=Literal(rows["table_name"])), ) + return self + + def qa_one_pk(self: PgUpsert, table: str) -> PgUpsert: + """Performs primary key checks for duplicated primary key values in a single staging table. + + :param table: The name of the staging table to check for duplicate primary key values. + :type table: str - def qa_one_pk(self: PgUpsert, table: str) -> list | None: - """Performs primary key checks for duplicated primary key values in a single staging table.""" + :objects created: + + - `ups_primary_key_columns`: Temporary table containing the primary key columns of the base table. + - `ups_pk_check`: Temporary view containing the duplicate primary key values. + """ pk_errors = [] logger.info( - f"Checking for duplicated primary key values in table {self.stg_schema}.{table}", + f"Conducting primary key QA checks on table {self.stg_schema}.{table}", ) - self.validate_table(table) + self._validate_table(table) # Create a table listing the primary key columns of the base table. self.db.execute( SQL( @@ -1463,7 +1320,7 @@ def qa_one_pk(self: PgUpsert, table: str) -> list | None: pk_errors.append(err_msg) logger.debug("") err_sql = SQL("select * from ups_pk_check;") - logger.debug(f"\n{self.show_results(err_sql)}") + logger.debug(f"\n{self._show(err_sql)}") logger.debug("") if self.interactive: btn, return_value = TableUI( @@ -1474,14 +1331,28 @@ def qa_one_pk(self: PgUpsert, table: str) -> list | None: ("Cancel", 1, ""), ], pk_headers, - list(pk_errs), + [[row[header] for header in pk_headers] for row in pk_errs], ).activate() if btn != 0: logger.warning("Script cancelled by user") sys.exit(0) - return pk_errors + if len(pk_errors) > 0: + self.db.execute( + SQL( + """ + update {control_table} + set pk_errors = {pk_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + pk_errors=Literal(",".join(pk_errors)), + table_name=Literal(table), + ), + ) + return self - def qa_all_fk(self: PgUpsert) -> None: + def qa_all_fk(self: PgUpsert) -> PgUpsert: """Performs foreign key checks for invalid foreign key values in selected staging tables.""" while True: rows, headers, rowcount = self.db.rowdict( @@ -1490,21 +1361,7 @@ def qa_all_fk(self: PgUpsert) -> None: if rowcount == 0: break rows = next(iter(rows)) - fk_errors = self.qa_one_fk(table=rows["table_name"]) - if fk_errors: - self.db.execute( - SQL( - """ - update {control_table} - set fk_errors = {fk_errors} - where table_name = {table_name}; - """, - ).format( - control_table=Identifier(self.control_table), - fk_errors=Literal(",".join(fk_errors)), - table_name=Literal(rows["table_name"]), - ), - ) + self.qa_one_fk(table=rows["table_name"]) # Set the 'processed' column to True in the control table. self.db.execute( SQL( @@ -1515,12 +1372,26 @@ def qa_all_fk(self: PgUpsert) -> None: """, ).format(table_name=Literal(rows["table_name"])), ) + return self + + def qa_one_fk(self: PgUpsert, table: str) -> PgUpsert: + """Performs foreign key checks for invalid foreign key values in a single staging table. - def qa_one_fk(self: PgUpsert, table: str) -> list | None: + :param table: The name of the staging table to check for invalid foreign key values. + :type table: str + + :objects created: + + - `ups_foreign_key_columns`: Temporary table containing the foreign key columns of the base table. + - `ups_sel_fks`: Temporary table containing the foreign key relationships for the base table. + - `ups_fk_constraints`: Temporary table containing the unique constraint names for the table. + - `ups_one_fk`: Temporary table containing the foreign key relationships for the base table. + - `ups_fk_check`: Temporary view containing the invalid foreign key values. + """ logger.info( f"Conducting foreign key QA checks on table {self.stg_schema}.{table}", ) - self.validate_table(table) + self._validate_table(table) # Create a table of *all* foreign key dependencies in this database. # Only create it once because it may slow the QA process down. if ( @@ -1728,7 +1599,7 @@ def qa_one_fk(self: PgUpsert, table: str) -> list | None: f" Foreign key error referencing {const_rows['uq_schema']}.{const_rows['uq_table']}", ) logger.debug("") - logger.debug(f"\n{self.show_results(check_sql)}") + logger.debug(f"\n{self._show(check_sql)}") logger.debug("") if self.interactive: btn, return_value = TableUI( @@ -1739,7 +1610,7 @@ def qa_one_fk(self: PgUpsert, table: str) -> list | None: ("Cancel", 1, ""), ], fk_check_headers, - fk_check_rows, + [[row[header] for header in fk_check_headers] for row in [fk_check_rows]], ).activate() if btn != 0: logger.warning("Script cancelled by user") @@ -1789,10 +1660,39 @@ def qa_one_fk(self: PgUpsert, table: str) -> list | None: ), ) if err_rowcount > 0: - return [err["fk_errors"] for err in list(err_rows) if err["fk_errors"]] - return None + err_rows = list(err_rows) + # If any 'fk_errors' key is not None in the list of dictionaries, + # update the control table with the list of foreign key errors. + if any(err["fk_errors"] for err in err_rows): + self.db.execute( + SQL( + """ + update {control_table} + set fk_errors = {fk_errors} + where table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + fk_errors=Literal( + ",".join( + [err["fk_errors"] for err in err_rows if err["fk_errors"]], + ), + ), + table_name=Literal(table), + ), + ) + return self + + def qa_all_ck(self: PgUpsert) -> PgUpsert: + """Performs check constraint checks for invalid check constraint values in selected staging tables. + + :objects created: - def qa_all_ck(self: PgUpsert) -> None: + - `ups_check_constraints`: Temporary table containing the check constraints of the base table. + - `ups_sel_cks`: Temporary table containing the check constraints for the base table. + - `ups_ck_check_check`: Temporary view containing the check constraint values. + - `ups_ck_error_list`: Temporary table containing the list of check constraint errors. + """ while True: rows, headers, rowcount = self.db.rowdict( SQL("select * from ups_toprocess;"), @@ -1828,8 +1728,20 @@ def qa_all_ck(self: PgUpsert) -> None: """, ).format(table_name=Literal(rows["table_name"])), ) + return self + + def qa_one_ck(self: PgUpsert, table: str) -> PgUpsert: + """Performs check constraint checks for invalid check constraint values in a single staging table. + + :param table: The name of the staging table to check for invalid check constraint values. + :type table: str + + :objects created: - def qa_one_ck(self: PgUpsert, table: str) -> list | None: + - `ups_sel_cks`: Temporary table containing the check constraints for the base table. + - `ups_ck_check_check`: Temporary view containing the check constraint values. + - `ups_ck_error_list`: Temporary table containing the list of check constraint errors. + """ logger.info( f"Conducting check constraint QA checks on table {self.stg_schema}.{table}", ) @@ -1990,12 +1902,859 @@ def qa_one_ck(self: PgUpsert, table: str) -> list | None: """, ), ) + return self + + def upsert_all(self: PgUpsert) -> PgUpsert: + """Performs upsert operations on all selected tables in the base schema. - def upsert_all(self: PgUpsert) -> None: + :objects created: + + - `ups_dependencies`: Temporary table containing the dependencies of the base schema. + - `ups_ordered_tables`: Temporary table containing the selected tables ordered by dependency. + - `ups_proctables`: Temporary table containing the selected tables with ordering information. + """ + self._validate_control() if not self.qa_passed: - self.qa_all() - self.validate_control() - logger.info("===Starting upsert procedures===") + logger.warning( + "QA checks have not been run or have failed. Continuing anyway.", + ) + logger.info(f"===Starting upsert procedures (COMMIT={self.do_commit})===") + # Get a table of all dependencies for the base schema. + self.db.execute( + SQL( + """ + drop table if exists ups_dependencies cascade; + create temporary table ups_dependencies as + select + tc.table_name as child, + tu.table_name as parent + from + information_schema.table_constraints as tc + inner join information_schema.constraint_table_usage as tu + on tu.constraint_name = tc.constraint_name + where + tc.constraint_type = 'FOREIGN KEY' + and tc.table_name <> tu.table_name + and tc.table_schema = {base_schema}; + """, + ).format(base_schema=Literal(self.base_schema)), + ) + # Create a list of tables in the base schema ordered by dependency. + self.db.execute( + SQL( + """ + drop table if exists ups_ordered_tables cascade; + with recursive dep_depth as ( + select + dep.child as first_child, + dep.child, + dep.parent, + 1 as lvl + from + ups_dependencies as dep + union all + select + dd.first_child, + dep.child, + dep.parent, + dd.lvl + 1 as lvl + from + dep_depth as dd + inner join ups_dependencies as dep on dep.parent = dd.child + and dep.child <> dd.parent + and not (dep.parent = dd.first_child and dd.lvl > 2) + ) + select + table_name, + table_order + into + temporary table ups_ordered_tables + from ( + select + dd.parent as table_name, + max(lvl) as table_order + from + dep_depth as dd + group by + table_name + union + select + dd.child as table_name, + max(lvl) + 1 as level + from + dep_depth as dd + left join ups_dependencies as dp on dp.parent = dd.child + where + dp.parent is null + group by + dd.child + union + select distinct + t.table_name, + 0 as level + from + information_schema.tables as t + left join ups_dependencies as p on t.table_name=p.parent + left join ups_dependencies as c on t.table_name=c.child + where + t.table_schema = {base_schema} + and t.table_type = 'BASE TABLE' + and p.parent is null + and c.child is null + ) as all_levels; + """, + ).format(base_schema=Literal(self.base_schema)), + ) + # Create a list of the selected tables with ordering information. + self.db.execute( + SQL( + """ + drop table if exists ups_proctables cascade; + select + ot.table_order, + tl.table_name, + tl.exclude_cols, + tl.interactive, + False::boolean as processed + into + temporary table ups_proctables + from + {control_table} as tl + inner join ups_ordered_tables as ot on ot.table_name = tl.table_name + ; + """, + ).format(control_table=Identifier(self.control_table)), + ) + while True: + # Create a view returning a single unprocessed table, in order. + proc_rows, proc_headers, proc_rowcount = self.db.rowdict( + SQL( + """ + select + table_name, exclude_cols, interactive + from ups_proctables + where not processed + order by table_order + limit 1; + """, + ), + ) + if proc_rowcount == 0: + break + proc_rows = next(iter(proc_rows)) + self.upsert_one(proc_rows["table_name"]) + self.db.execute( + SQL( + """ + update ups_proctables + set processed = True + where table_name = {table_name}; + """, + ).format(table_name=Literal(proc_rows["table_name"])), + ) + return self + + def upsert_one(self: PgUpsert, table: str) -> PgUpsert: + """Performs an upsert operation on a single table. + + :param table: The name of the table to upsert. + :type table: str + + :objects created: + + - `ups_cols`: Temporary table containing the columns to be updated. + - `ups_pks`: Temporary table containing the primary key columns. + - `ups_fk_check`: Temporary view containing the foreign key check. + - `ups_toprocess`: Temporary table containing the tables to be processed. + """ + rows_updated = 0 + rows_inserted = 0 + logger.info(f"Performing upsert on table {self.base_schema}.{table}") + self._validate_table(table) + + spec_rows, spec_headers, spec_rowcount = self.db.rowdict( + SQL( + """ + select table_name, exclude_cols, interactive + from {control_table} + where table_name = {table}; + """, + ).format( + control_table=Identifier(self.control_table), + table=Literal(table), + ), + ) + if spec_rowcount == 0: + logger.warning(f"Table {table} not found in control table") + return self + spec_rows = next(iter(spec_rows)) + # Populate a (temporary) table with the names of the columns + # in the base table that are to be updated from the staging table. + # Include only those columns from staging table that are also in base table. + query = SQL( + """ + drop table if exists ups_cols cascade; + select s.column_name + into temporary table ups_cols + from information_schema.columns as s + inner join information_schema.columns as b on s.column_name=b.column_name + where + s.table_schema = {stg_schema} + and s.table_name = {table} + and b.table_schema = {base_schema} + and b.table_name = {table} + """, + ).format( + stg_schema=Literal(self.stg_schema), + table=Literal(table), + base_schema=Literal(self.base_schema), + ) + if spec_rows["exclude_cols"]: + query += SQL( + """ + and s.column_name not in ({exclude_cols}) + """, + ).format( + exclude_cols=SQL(",").join( + Literal(col) for col in spec_rows["exclude_cols"] if spec_rows["exclude_cols"] + ), + ) + query += SQL(" order by s.ordinal_position;") + self.db.execute(query) + # Populate a (temporary) table with the names of the primary key + # columns of the base table. + self.db.execute( + SQL( + """ + drop table if exists ups_pks cascade; + select k.column_name + into temporary table ups_pks + from information_schema.table_constraints as tc + inner join information_schema.key_column_usage as k + on tc.constraint_type = 'PRIMARY KEY' + and tc.constraint_name = k.constraint_name + and tc.constraint_catalog = k.constraint_catalog + and tc.constraint_schema = k.constraint_schema + and tc.table_schema = k.table_schema + and tc.table_name = k.table_name + and tc.constraint_name = k.constraint_name + where + k.table_name = {table} + and k.table_schema = {base_schema} + order by k.ordinal_position; + """, + ).format(table=Literal(table), base_schema=Literal(self.base_schema)), + ) + # Get all base table columns that are to be updated into a comma-delimited list. + all_col_list = self.db.execute( + SQL( + """ + select string_agg(column_name, ', ') as cols from ups_cols;""", + ), + ).fetchone() + if not all_col_list: + logger.warning("No columns found in base table") + return self + all_col_list = next(iter(all_col_list)) + # Get all base table columns that are to be updated into a + # comma-delimited list with a "b." prefix. + base_col_list = self.db.execute( + SQL( + """ + select string_agg('b.' || column_name, ', ') as cols + from ups_cols;""", + ), + ).fetchone() + if not base_col_list: + logger.warning("No columns found in base table") + return self + base_col_list = next(iter(base_col_list)) + # Get all staging table column names for columns that are to be updated + # into a comma-delimited list with an "s." prefix. + stg_col_list = self.db.execute( + SQL( + """ + select string_agg('s.' || column_name, ', ') as cols + from ups_cols;""", + ), + ).fetchone() + if not stg_col_list: + logger.warning("No columns found in staging table") + return self + stg_col_list = next(iter(stg_col_list)) + # Get the primary key columns in a comma-delimited list. + pk_col_list = self.db.execute( + SQL( + """ + select string_agg(column_name, ', ') as cols + from ups_pks;""", + ), + ).fetchone() + if not pk_col_list: + logger.warning("Base table has no primary key") + return self + pk_col_list = next(iter(pk_col_list)) + # Create a join expression for key columns of the base (b) and + # staging (s) tables. + join_expr = self.db.execute( + SQL( + """ + select + string_agg('b.' || column_name || ' = s.' || column_name, ' and ') as expr + from + ups_pks; + """, + ), + ).fetchone() + if not join_expr: + logger.warning("Base table has no primary key") + return self + # Create a FROM clause for an inner join between base and staging + # tables on the primary key column(s). + from_clause = SQL( + """FROM {base_schema}.{table} as b + INNER JOIN {stg_schema}.{table} as s ON {join_expr}""", + ).format( + base_schema=Identifier(self.base_schema), + table=Identifier(table), + stg_schema=Identifier(self.stg_schema), + join_expr=SQL(join_expr[0]), + ) + # Create SELECT queries to pull all columns with matching keys from both + # base and staging tables. + self.db.execute( + SQL( + """ + drop view if exists ups_basematches cascade; + create temporary view ups_basematches as select {base_col_list} {from_clause}; + + drop view if exists ups_stgmatches cascade; + create temporary view ups_stgmatches as select {stg_col_list} {from_clause}; + """, + ).format( + base_col_list=SQL(base_col_list), + stg_col_list=SQL(stg_col_list), + from_clause=from_clause, + ), + ) + # Get non-key columns to be updated + self.db.execute( + SQL( + """ + drop view if exists ups_nk cascade; + create temporary view ups_nk as + select column_name from ups_cols + except + select column_name from ups_pks; + """, + ), + ) + do_updates = False + update_stmt = None + # Prepare updates + if self.upsert_method in ("upsert", "update"): + stg_curs = self.db.execute("select * from ups_stgmatches;") + if stg_curs.rowcount == 0: + logger.debug( + " No rows in staging table matching primary key in base table", + ) + stg_cols = [col.name for col in stg_curs.description] + stg_rowcount = stg_curs.rowcount + stg_data = stg_curs.fetchall() + nk_curs = self.db.execute("select * from ups_nk;") + nk_rowcount = nk_curs.rowcount + if stg_rowcount > 0 and nk_rowcount > 0: + base_curs = self.db.execute("select * from ups_basematches;") + if base_curs.rowcount == 0: + logger.debug( + " No rows in base table matching primary key in staging table", + ) + return self + base_cols = [col.name for col in base_curs.description] + base_data = base_curs.fetchall() + if spec_rows["interactive"]: + btn, return_value = CompareUI( + "Compare Tables", + f"Do you want to make these changes? For table {table}, new data are shown in the top table; existing data are shown in the bottom table.", # noqa: E501 + [ + ("Continue", 0, ""), + ("Skip", 1, ""), + ("Cancel", 2, ""), + ], + stg_cols, + stg_data, + base_cols, + base_data, + pk_col_list.split(", "), + sidebyside=False, + ).activate() + else: + btn = 0 + if btn == 2: + logger.warning("Script cancelled by user") + sys.exit(0) + if btn == 0: + do_updates = True + # Create an assignment expression to update non-key columns of the + # base table (un-aliased) from columns of the staging table (as s). + ups_expr = self.db.execute( + SQL( + """ + select string_agg( + column_name || ' = s.' || column_name, ', ' + ) as col + from ups_nk; + """, + ), + ).fetchone() + if not ups_expr: + logger.warning("Unexpected error in upsert_one") + return self + # Create an UPDATE statement to update the base table with + # non-key columns from the staging table. + # No semicolon terminating generated SQL. + update_stmt = SQL( + """ + UPDATE {base_schema}.{table} as b + SET {ups_expr} + FROM {stg_schema}.{table} as s WHERE {join_expr} + """, + ).format( + base_schema=Identifier(self.base_schema), + table=Identifier(table), + stg_schema=Identifier(self.stg_schema), + ups_expr=SQL(ups_expr[0]), + join_expr=SQL(join_expr[0]), + ) + else: + logger.info(" No rows to update") + + # Prepare the inserts. + do_inserts = False + insert_stmt = None + if self.upsert_method in ("upsert", "insert"): + # Create a select statement to find all rows of the staging table + # that are not in the base table. + self.db.execute( + SQL( + """ + drop view if exists ups_newrows cascade; + create temporary view ups_newrows as with newpks as ( + select {pk_col_list} + from {stg_schema}.{table} + except + select {pk_col_list} + from {base_schema}.{table} + ) + select s.* + from {stg_schema}.{table} as s + inner join newpks using ({pk_col_list}); + """, + ).format( + stg_schema=Identifier(self.stg_schema), + table=Identifier(table), + pk_col_list=SQL(pk_col_list), + base_schema=Identifier(self.base_schema), + ), + ) + # Prompt user to examine new data and continue or quit. + new_curs = self.db.execute("select * from ups_newrows;") + new_cols = [col.name for col in new_curs.description] + new_rowcount = new_curs.rowcount + new_data = new_curs.fetchall() + if new_rowcount > 0: + if spec_rows["interactive"]: + btn, return_value = TableUI( + "New Data", + f"Do you want to add these new data to the {self.base_schema}.{table} table?", + [ + ("Continue", 0, ""), + ("Skip", 1, ""), + ("Cancel", 2, ""), + ], + new_cols, + new_data, + ).activate() + else: + btn = 0 + if btn == 2: + logger.warning("Script cancelled by user") + sys.exit(0) + if btn == 0: + do_inserts = True + # Create an insert statement. No semicolon terminating generated SQL. + insert_stmt = SQL( + """ + INSERT INTO {base_schema}.{table} ({all_col_list}) + SELECT {all_col_list} FROM ups_newrows + """, + ).format( + base_schema=Identifier(self.base_schema), + table=Identifier(table), + all_col_list=SQL(all_col_list), + ) + else: + logger.info(" No new data to insert") + # Run the update and insert statements. + if do_updates and update_stmt and self.upsert_method in ("upsert", "update"): + logger.info(f" Updating {self.base_schema}.{table}") + logger.debug(f" UPDATE statement for {self.base_schema}.{table}") + logger.debug(f"{update_stmt.as_string(self.db.cursor())}") + self.db.execute(update_stmt) + rows_updated = stg_rowcount + logger.info(f" {rows_updated} rows updated") + if do_inserts and insert_stmt and self.upsert_method in ("upsert", "insert"): + logger.info(f" Adding data to {self.base_schema}.{table}") + logger.debug(f" INSERT statement for {self.base_schema}.{table}") + logger.debug(f"{insert_stmt.as_string(self.db.cursor())}") + self.db.execute(insert_stmt) + rows_inserted = new_rowcount + logger.info(f" {rows_inserted} rows inserted") + # Move the update/insert counts into the control table. + self.db.execute( + SQL( + """ + update {control_table} + set + rows_updated = {rows_updated}, + rows_inserted = {rows_inserted} + where + table_name = {table_name}; + """, + ).format( + control_table=Identifier(self.control_table), + rows_updated=Literal(rows_updated), + rows_inserted=Literal(rows_inserted), + table_name=Literal(table), + ), + ) + return self + + def run(self: PgUpsert) -> PgUpsert: + """Run all QA checks and upsert operations. + + This method runs :class:`PgUpsert` methods in the following order: + + 1. :meth:`PgUpsert.qa_all` + 2. :meth:`PgUpsert.upsert_all` + 3. :meth:`PgUpsert.commit` + """ + start_time = datetime.now() + logger.info(f"Upserting to {self.base_schema} from {self.stg_schema}") + if self.interactive: + logger.debug("Tables selected for upsert:") + for table in self.tables: + logger.debug(f" {table}") + btn, return_value = TableUI( + "Upsert Tables", + "Tables selected for upsert", + [ + ("Continue", 0, ""), + ("Cancel", 1, ""), + ], + ["Table"], + [[table] for table in self.tables], + ).activate() + if btn != 0: + logger.info("Upsert cancelled") + return self + else: + logger.info("Tables selected for upsert:") + for table in self.tables: + logger.info(f" {table}") + self._init_ups_control() + self.qa_all() + if self.qa_passed: + self.upsert_all() + self.commit() + logger.debug(f"Upsert completed in {ellapsed_time(start_time)}") + return self + + def commit(self: PgUpsert) -> PgUpsert: + """Commits the transaction to the database and show a summary of changes. + + Changes are committed if the following criteria are met: + + - The `do_commit` flag is set to `True`. + - All QA checks have passed (i.e., the `qa_passed` flag is set to `True`). Note that no checking is done to ensure that QA checks have been run. + - The summary of changes shows that rows have been updated or inserted. + - If the `interactive` flag is set to `True` and the `do_commit` flag is is set to `False`, the user is prompted to commit the changes and the user selects "Continue". + """ + self._validate_control() + final_ctrl_sql = SQL("select * from {control_table}").format( + control_table=Identifier(self.control_table), + ) + final_ctrl_rows, final_ctrl_headers, final_ctrl_rowcount = self.db.rowdict( + final_ctrl_sql, + ) + if self.interactive: + btn, return_value = TableUI( + "Upsert Summary", + "Below is a summary of changes. Do you want to commit these changes? ", + [ + ("Continue", 0, ""), + ("Cancel", 1, ""), + ], + final_ctrl_headers, + [[row[header] for header in final_ctrl_headers] for row in final_ctrl_rows], + ).activate() + else: + btn = 0 + logger.info("") + logger.info("Summary of changes:") + logger.info(self._show(final_ctrl_sql)) + + logger.info("") + + if btn == 0: + upsert_rows, upsert_headers, upsert_rowcount = self.db.rowdict( + SQL( + "select * from {control_table} where rows_updated > 0 or rows_inserted > 0", + ).format(control_table=Identifier(self.control_table)), + ) + if upsert_rowcount == 0: + logger.info("No changes to commit") + self.db.rollback() + else: + if self.do_commit: + self.db.commit() + logger.info("Changes committed") + else: + logger.info( + "The do_commit flag is set to FALSE, rolling back changes.", + ) + self.db.rollback() + else: + logger.info("Rolling back changes") + self.db.rollback() + self.db.close() + return self + + +def treeview_table( + parent: ttk.Frame, + rowset: list | tuple, + column_headers: list | tuple, + select_mode="none", +): + """Creates a TreeView table containing the specified data, with scrollbars and + status bar in an enclosing frame. + This does not grid the table frame in its parent widget. Returns a tuple + of 0: the frame containing the table, and 1: the table widget itself. + """ + nrows = range(len(rowset)) + ncols = range(len(column_headers)) + hdrwidths = [len(column_headers[j]) for j in ncols] + if len(rowset) > 0: + datawidthtbl = [ + [ + len( + (rowset[i][j] if isinstance(rowset[i][j], str) else str(rowset[i][j])), + ) + for i in nrows + ] + for j in ncols + ] + datawidths = [max(cwidths) for cwidths in datawidthtbl] + else: + datawidths = hdrwidths + colwidths = [max(hdrwidths[i], datawidths[i]) for i in ncols] + # Set the font. + ff = tkfont.nametofont("TkFixedFont") + tblstyle = ttk.Style() + tblstyle.configure("tblstyle", font=ff) + charpixels = int(1.3 * ff.measure("0")) + tableframe = ttk.Frame(master=parent, padding="3 3 3 3") + statusframe = ttk.Frame(master=tableframe) + # Create and configure the Treeview table widget + tv_widget = ttk.Treeview( + tableframe, + columns=column_headers, + selectmode=select_mode, + show="headings", + ) + tv_widget.configure()["style"] = tblstyle + ysb = ttk.Scrollbar(tableframe, orient="vertical", command=tv_widget.yview) + xsb = ttk.Scrollbar(tableframe, orient="horizontal", command=tv_widget.xview) + tv_widget.configure(yscrollcommand=ysb.set, xscrollcommand=xsb.set) + # Status bar + statusbar = ttk.Label( + statusframe, + text=" %d rows" % len(rowset), + relief=tk.RIDGE, + anchor=tk.W, + ) + tableframe.statuslabel = statusbar + # Fill the Treeview table widget with data + set_tv_headers(tv_widget, column_headers, colwidths, charpixels) + fill_tv_table(tv_widget, rowset, statusbar) + # Place the table + tv_widget.grid(column=0, row=0, sticky=tk.NSEW) + ysb.grid(column=1, row=0, sticky=tk.NS) + xsb.grid(column=0, row=1, sticky=tk.EW) + statusframe.grid(column=0, row=3, sticky=tk.EW) + tableframe.columnconfigure(0, weight=1) + tableframe.rowconfigure(0, weight=1) + # Place the status bar + statusbar.pack(side=tk.BOTTOM, fill=tk.X) + # Allow resizing of the table + tableframe.columnconfigure(0, weight=1) + tableframe.rowconfigure(0, weight=1) + # + return tableframe, tv_widget + + +def set_tv_headers( + tvtable: ttk.Treeview, + column_headers: list, + colwidths: list, + charpixels: int, +): + """Set the headers and column widths for a Treeview table widget.""" + pixwidths = [charpixels * col for col in colwidths] + for i in range(len(column_headers)): + hdr = column_headers[i] + tvtable.column(hdr, width=pixwidths[i]) + tvtable.heading( + hdr, + text=hdr, + command=lambda _col=hdr: treeview_sort_column(tvtable, _col, False), + ) + + +def treeview_sort_column(tv: ttk.Treeview, col: str, reverse: bool): + """Sort a column in a Treeview table widget. + + From https://stackoverflow.com/questions/1966929/tk-treeview-column-sort#1967793 + """ + colvals = [(tv.set(k, col), k) for k in tv.get_children()] + colvals.sort(reverse=reverse) + # Rearrange items in sorted positions + for index, (_val, k) in enumerate(colvals): + tv.move(k, "", index) + # Reverse sort next time + tv.heading(col, command=lambda: treeview_sort_column(tv, col, not reverse)) + + +def fill_tv_table(tvtable: ttk.Treeview, rowset: list | tuple, status_label=None): + """Fill a Treeview table widget with data.""" + for i, row in enumerate(rowset): + enc_row = [c if c is not None else "" for c in row] + tvtable.insert(parent="", index="end", iid=str(i), values=enc_row) + if status_label is not None: + status_label.config(text=" %d rows" % len(rowset)) + + +def ellapsed_time(start_time: datetime): + """Returns a string representing the ellapsed time since the start time.""" + dt = (datetime.now() - start_time).total_seconds() + if dt < 60: + return f"{round((datetime.now() - start_time).total_seconds(), 3)} seconds" + if dt < 3600: + return f"{int(dt // 60)} minutes, {round(dt % 60, 3)} seconds" + return f"{int(dt // 3600)} hours, {int((dt % 3600)) // 60} minutes, {round(dt % 60, 3)} seconds" # noqa: UP034 + + +def clparser() -> argparse.ArgumentParser: + """Command line interface for the upsert function.""" + parser = argparse.ArgumentParser( + description=__description__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--version", + action="version", + version=f"%(prog)s {__version__}", + ) + parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="suppress all console output", + ) + parser.add_argument( + "-d", + "--debug", + action="store_true", + help="display debug output", + ) + parser.add_argument( + "-l", + "--log", + type=Path, + metavar="LOGFILE", + help="write log to LOGFILE", + ) + parser.add_argument( + "-e", + "--exclude", + metavar="EXCLUDE_COLUMNS", + help="comma-separated list of columns to exclude from null checks", + ) + parser.add_argument( + "-n", + "--null", + metavar="NULL_COLUMNS", + help="comma-separated list of columns to exclude from null checks", + ) + parser.add_argument( + "-c", + "--do-commit", + action="store_true", + help="commit changes to database", + ) + parser.add_argument( + "-i", + "--interactive", + action="store_true", + help="display interactive GUI of important table information", + ) + parser.add_argument( + "-m", + "--upsert-method", + metavar="UPSERT_METHOD", + default="upsert", + choices=["upsert", "update", "insert"], + help="method to use for upsert", + ) + parser.add_argument( + "host", + metavar="HOST", + help="database host", + ) + parser.add_argument( + "port", + metavar="PORT", + type=int, + help="database port", + ) + parser.add_argument( + "database", + metavar="DATABASE", + help="database name", + ) + parser.add_argument( + "user", + metavar="USER", + help="database user", + ) + parser.add_argument( + "stg_schema", + metavar="STAGING_SCHEMA", + help="staging schema name", + ) + parser.add_argument( + "base_schema", + metavar="BASE_SCHEMA", + help="base schema name", + ) + parser.add_argument( + "tables", + metavar="TABLE", + nargs="+", + help="table name(s)", + ) + return parser def cli() -> None: diff --git a/pyproject.toml b/pyproject.toml index 6c033b4..2bfe459 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,11 +16,7 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", ] -dependencies = [ - "polars >= 1.0.1", - "psycopg2-binary >= 2.9.9", - "tabulate >= 0.9.0", -] +dependencies = ["psycopg2-binary >= 2.9.9", "tabulate >= 0.9.0"] keywords = ["postgresql", "postgres", "dbms", "etl", "upsert", "database"] [project.scripts] @@ -96,7 +92,7 @@ select = [ "PD", "RUF", ] -ignore = ["PD901", "S101"] +ignore = ["PD901", "S101", "F401"] [tool.ruff.format] # Like Black, use double quotes for strings. diff --git a/requirements.txt b/requirements.txt index 40b3016..51dec40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,7 @@ charset-normalizer==3.3.2 click==8.1.7 coverage==7.6.0 distlib==0.3.8 -docutils==0.21.2 +docutils==0.20.1 filelock==3.14.0 identify==2.5.36 idna==3.7 @@ -32,7 +32,6 @@ more-itertools==10.3.0 nh3==0.2.18 nodeenv==1.8.0 packaging==24.1 -pg_upsert @ file:///Users/cgrant/GitHub/geocoug/pg_upsert pkginfo==1.10.0 platformdirs==4.2.2 pluggy==1.5.0 @@ -62,9 +61,11 @@ snowballstemmer==2.2.0 soupsieve==2.5 Sphinx==7.4.6 sphinx-book-theme==1.1.3 +sphinx-rtd-theme==2.0.0 sphinxcontrib-applehelp==1.0.8 sphinxcontrib-devhelp==1.0.6 sphinxcontrib-htmlhelp==2.0.5 +sphinxcontrib-jquery==4.1 sphinxcontrib-jsmath==1.0.1 sphinxcontrib-qthelp==1.0.7 sphinxcontrib-serializinghtml==1.1.10