diff --git a/analytics/src/analytics/integrations/etldb/etldb.py b/analytics/src/analytics/integrations/etldb/etldb.py index 63ffa18ed..1ccc3efe4 100644 --- a/analytics/src/analytics/integrations/etldb/etldb.py +++ b/analytics/src/analytics/integrations/etldb/etldb.py @@ -1,11 +1,14 @@ """Define EtlDb as an abstraction layer for database connections.""" +import logging from enum import Enum from sqlalchemy import Connection, text from analytics.integrations.db import PostgresDbClient +logger = logging.getLogger(__name__) + class EtlDb: """Encapsulate etl database connections.""" @@ -62,7 +65,7 @@ def set_schema_version(self, new_value: int) -> bool: "WARNING: cannot bump schema version " f"from {current_version} to {new_value}" ) - print(message) + logger.info(message) return False if new_value > current_version: @@ -92,7 +95,7 @@ def revert_to_schema_version(self, new_value: int) -> bool: "WARNING: cannot bump schema version " f"from {current_version} to {new_value}" ) - print(message) + logger.info(message) return False cursor = self.connection() diff --git a/analytics/src/analytics/integrations/etldb/main.py b/analytics/src/analytics/integrations/etldb/main.py index d87034ad7..dff1cac2d 100644 --- a/analytics/src/analytics/integrations/etldb/main.py +++ b/analytics/src/analytics/integrations/etldb/main.py @@ -1,5 +1,6 @@ """Integrate with database to read and write etl data.""" +import logging import os import re from pathlib import Path @@ -17,6 +18,8 @@ VERBOSE = False +logger = logging.getLogger(__name__) + def migrate_database() -> None: """ @@ -33,7 +36,8 @@ def migrate_database() -> None: # get connection to database etldb = EtlDb() current_version = etldb.get_schema_version() - print(f"current schema version: {current_version}") + m = f"current schema version: {current_version}" + logger.info(m) # get all sql file paths and respective version numbers sql_file_path_map = get_sql_file_paths() @@ -48,8 +52,10 @@ def migrate_database() -> None: with open(sql_file_path_map[next_version]) as f: sql = f.read() # execute sql - print(f"applying migration for schema version: {next_version}") - print(f"migration source file: {sql_file_path_map[next_version]}") + m = f"applying migration for schema version: {next_version}" + logger.info(m) + m = f"migration source file: {sql_file_path_map[next_version]}" + logger.info(m) cursor = etldb.connection() cursor.execute( text(sql), @@ -62,8 +68,10 @@ def migrate_database() -> None: migration_count += 1 # summarize results in output - print(f"total migrations applied: {migration_count}") - print(f"new schema version: {current_version}") + m = f"total migrations applied: {migration_count}" + logger.info(m) + m = f"new schema version: {current_version}" + logger.info(m) def sync_data(dataset: EtlDataset, effective: str) -> None: @@ -83,11 +91,13 @@ def sync_data(dataset: EtlDataset, effective: str) -> None: # note: the following code assumes SCHEMA VERSION >= 4 # sync project data to db resulting in row id for each project ghid_map[EtlEntityType.PROJECT] = sync_projects(db, dataset) - print(f"project row(s) processed: {len(ghid_map[EtlEntityType.PROJECT])}") + m = f"project row(s) processed: {len(ghid_map[EtlEntityType.PROJECT])}" + logger.info(m) # sync quad data to db resulting in row id for each quad ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset) - print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}") + m = f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}" + logger.info(m) # sync deliverable data to db resulting in row id for each deliverable ghid_map[EtlEntityType.DELIVERABLE] = sync_deliverables( @@ -95,21 +105,23 @@ def sync_data(dataset: EtlDataset, effective: str) -> None: dataset, ghid_map, ) - print( - f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}", - ) + m = f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}" + logger.info(m) # sync sprint data to db resulting in row id for each sprint ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map) - print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}") + m = f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}" + logger.info(m) # sync epic data to db resulting in row id for each epic ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map) - print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}") + m = f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}" + logger.info(m) # sync issue data to db resulting in row id for each issue issue_map = sync_issues(db, dataset, ghid_map) - print(f"issue row(s) processed: {len(issue_map)}") + m = f"issue row(s) processed: {len(issue_map)}" + logger.info(m) def sync_deliverables(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict: @@ -120,7 +132,8 @@ def sync_deliverables(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict: deliverable_df = dataset.get_deliverable(ghid) result[ghid], _ = model.sync_deliverable(deliverable_df, ghid_map) if VERBOSE: - print(f"DELIVERABLE '{ghid}' row_id = {result[ghid]}") + m = f"DELIVERABLE '{ghid}' row_id = {result[ghid]}" + logger.info(m) return result @@ -132,7 +145,8 @@ def sync_epics(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict: epic_df = dataset.get_epic(ghid) result[ghid], _ = model.sync_epic(epic_df, ghid_map) if VERBOSE: - print(f"EPIC '{ghid}' row_id = {result[ghid]}") + m = f"EPIC '{ghid}' row_id = {result[ghid]}" + logger.info(m) return result @@ -145,7 +159,8 @@ def sync_issues(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict: for _, issue_df in all_rows.iterrows(): result[ghid], _ = model.sync_issue(issue_df, ghid_map) if VERBOSE: - print(f"ISSUE '{ghid}' issue_id = {result[ghid]}") + m = f"ISSUE '{ghid}' issue_id = {result[ghid]}" + logger.info(m) return result @@ -157,9 +172,10 @@ def sync_projects(db: EtlDb, dataset: EtlDataset) -> dict: project_df = dataset.get_project(ghid) result[ghid], _ = model.sync_project(project_df) if VERBOSE: - print( + m = ( f"PROJECT '{ghid}' title = '{project_df['project_name']}', row_id = {result[ghid]}", ) + logger.info(m) return result @@ -171,7 +187,8 @@ def sync_sprints(db: EtlDb, dataset: EtlDataset, ghid_map: dict) -> dict: sprint_df = dataset.get_sprint(ghid) result[ghid], _ = model.sync_sprint(sprint_df, ghid_map) if VERBOSE: - print(f"SPRINT '{ghid}' row_id = {result[ghid]}") + m = f"SPRINT '{ghid}' row_id = {result[ghid]}" + logger.info(m) return result @@ -183,9 +200,10 @@ def sync_quads(db: EtlDb, dataset: EtlDataset) -> dict: quad_df = dataset.get_quad(ghid) result[ghid], _ = model.sync_quad(quad_df) if VERBOSE: - print( + m = ( f"QUAD '{ghid}' title = '{quad_df['quad_name']}', row_id = {result[ghid]}", ) + logger.info(m) return result