diff --git a/Makefile b/Makefile index 302e5e1..71954b3 100644 --- a/Makefile +++ b/Makefile @@ -18,18 +18,19 @@ $(VENV)/bin/activate: requirements.txt $(PIP) install -r requirements.txt clean: ## Remove temporary files - rm -rf .ipynb_checkpoints - rm -rf **/.ipynb_checkpoints - rm -rf __pycache__ - rm -rf **/__pycache__ - rm -rf **/**/__pycache__ - rm -rf .pytest_cache - rm -rf **/.pytest_cache - rm -rf .ruff_cache - rm -rf .coverage - rm -rf build - rm -rf dist - rm -rf *.egg-info + @rm -rf .ipynb_checkpoints + @rm -rf **/.ipynb_checkpoints + @rm -rf __pycache__ + @rm -rf **/__pycache__ + @rm -rf **/**/__pycache__ + @rm -rf .pytest_cache + @rm -rf **/.pytest_cache + @rm -rf .ruff_cache + @rm -rf .coverage + @rm -rf build + @rm -rf dist + @rm -rf *.egg-info + @rm -rf pg_upsert.log show-bump: ## Show the next version @bump-my-version show-bump diff --git a/pyproject.toml b/pyproject.toml index 28f9223..a56ed07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ Issues = "https://github.com/geocoug/pg_upsert/issues" addopts = ["--cov=pg_upsert", "-v"] testpaths = ["tests"] log_cli = true -log_cli_level = "DEBUG" +log_cli_level = "INFO" log_cli_format = "%(asctime)s - %(name)s - %(levelname)s: %(message)s" log_cli_date_format = "%Y-%m-%d %H:%M:%S" diff --git a/tests/test_pg_upsert.py b/tests/test_pg_upsert.py index b635939..59e6113 100644 --- a/tests/test_pg_upsert.py +++ b/tests/test_pg_upsert.py @@ -1,12 +1,14 @@ #!/usr/bin/env python +# import logging import logging import os import pytest from dotenv import load_dotenv +from psycopg2.sql import SQL, Identifier, Literal -from pg_upsert.pg_upsert import PostgresDB +from pg_upsert.pg_upsert import PostgresDB, upsert load_dotenv() @@ -34,10 +36,16 @@ def db(global_variables): user=global_variables["POSTGRES_USER"], passwd=global_variables["POSTGRES_PASSWORD"], ) - logger.debug("Database object created.") yield db + db.execute( + """ + delete from public.book_authors; + delete from public.authors; + delete from public.books; + delete from public.genres; + """, + ) db.close() - logger.debug("Database object closed.") def test_db_connection(db): @@ -53,3 +61,92 @@ def test_db_execute(db): """Test a simple query execution.""" cur = db.execute("SELECT 1") assert cur.fetchone()[0] == 1 + + +def test_db_execute_values(db): + """Test a query execution with values.""" + cur = db.execute(SQL("SELECT {}").format(Literal(1))) + assert cur.fetchone()[0] == 1 + cur = db.execute( + SQL( + """ + select table_name from information_schema.tables + where table_schema={schema} and {column}={value} + """, + ).format( + schema=Literal("staging"), + column=Identifier("table_name"), + value=Literal("genres"), + ), + ) + assert cur.fetchone()[0] == "genres" + + +def test_db_rowdict(db): + """Test the rowdict function.""" + rows, headers, rowcount = db.rowdict("SELECT 1 as one, 2 as two") + assert iter(rows) + assert headers == ["one", "two"] + assert rowcount == 1 + rows = list(rows) + assert rows[0]["one"] == 1 + assert rows[0]["two"] == 2 + + +def test_db_rowdict_params(db): + """Test the rowdict function with parameters.""" + rows, headers, rowcount = db.rowdict( + SQL("SELECT {one} as one, {two} as two").format( + one=Literal(1), + two=Literal(2), + ), + ) + assert iter(rows) + assert headers == ["one", "two"] + assert rowcount == 1 + rows = list(rows) + assert rows[0]["one"] == 1 + assert rows[0]["two"] == 2 + + +def test_db_dataframe(db): + """Test the dataframe function.""" + df = db.dataframe("SELECT 1 as one, 2 as two") + assert df.shape == (1, 2) + assert df["one"][0] == 1 + assert df["two"][0] == 2 + + +def test_db_dataframe_params(db): + """Test the dataframe function with parameters.""" + df = db.dataframe( + SQL("SELECT {one} as one, {two} as two").format( + one=Literal(1), + two=Literal(2), + ), + ) + assert df.shape == (1, 2) + assert df["one"][0] == 1 + assert df["two"][0] == 2 + + +def test_upsert_no_commit(global_variables, db): + # Run the upsert function. The function should raise a SystemExit error that + # qa checks failed. + with pytest.raises(SystemExit) as exc_info: + upsert( + host=global_variables["POSTGRES_HOST"], + database=global_variables["POSTGRES_DB"], + user=global_variables["POSTGRES_USER"], + passwd=global_variables["POSTGRES_PASSWORD"], + tables=["genres", "authors", "books", "book_authors"], + stg_schema="staging", + base_schema="public", + upsert_method="upsert", + commit=False, + interactive=False, + exclude_cols=[], + exclude_null_check_colls=[], + ) + assert exc_info.type is SystemExit + assert exc_info.value.code == 1