Skip to content

Commit

Permalink
2317/Copy data from grants-db into the opportunity table(s) in the an…
Browse files Browse the repository at this point in the history
…alytics db (#3228)

## Summary
Fixes #{[2317](#2317)}

### Time to review: __20 mins__

## Changes proposed
New Cli Function to upload opportunity tables into analytics db
S3 configuration to read `csv `opportunity tables 
Add S3 Environment variables 
Mock s3 client for testing
Added fixtures for AWS (From API code), test-schema and
opportunity-tables to aid in testing, scoped to test session.
Added fixture to delete table records after each test
Added opportunity table `csv` files for testing
Added test that checks files were successfully uploaded and records
inserted into test-schema tables



## Context for reviewers
> poetry run pytest
./tests/integrations/extracts/test_load_opportunity_data.py when running
test locally

## Additional information
> Screenshots, GIF demos, code examples or output to help show the
changes working as expected.
  • Loading branch information
babebe authored and doug-s-nava committed Dec 30, 2024
1 parent 2052dba commit b429083
Show file tree
Hide file tree
Showing 17 changed files with 823 additions and 3 deletions.
6 changes: 6 additions & 0 deletions analytics/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ db-migrate:
$(POETRY) analytics etl db_migrate
@echo "====================================================="

opportunity-load:
@echo "=> Ingesting opportunity data into the database"
@echo "====================================================="
$(POETRY) analytics etl opportunity-load
@echo "====================================================="

gh-transform-and-load:
@echo "=> Transforming and loading GitHub data into the database"
@echo "====================================================="
Expand Down
6 changes: 6 additions & 0 deletions analytics/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ LOG_FORMAT=human-readable

# Change the message length for the human readable formatter
# LOG_HUMAN_READABLE_FORMATTER__MESSAGE_WIDTH=50

############################
# S3
############################

LOAD_OPPORTUNITY_DATA_FILE_PATH=S3://local-opportunities/public-extracts
194 changes: 193 additions & 1 deletion analytics/poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion analytics/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ python = "~3.13"
slack-sdk = "^3.23.0"
typer = { extras = ["all"], version = "^0.15.0" }
sqlalchemy = "^2.0.30"
psycopg = ">=3.0.7"
pydantic-settings = "^2.3.4"
boto3 = "^1.35.56"
boto3-stubs = "^1.35.56"
psycopg = "^3.2.3"
smart-open = "^7.0.5"

[tool.poetry.group.dev.dependencies]
black = "^24.0.0"
Expand All @@ -36,6 +37,7 @@ pytest-cov = "^5.0.0"
ruff = "^0.8.0"
safety = "^3.0.0"
types-requests = "^2.32.0.20241016"
moto = "^5.0.22"

[build-system]
build-backend = "poetry.core.masonry.api"
Expand Down
9 changes: 9 additions & 0 deletions analytics/src/analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from analytics.etl.utils import load_config
from analytics.integrations import etldb, slack
from analytics.integrations.db import PostgresDbClient
from analytics.integrations.extracts.load_opportunity_data import (
extract_copy_opportunity_data,
)
from analytics.logs import init as init_logging
from analytics.logs.app_logger import init_app
from analytics.logs.ecs_background_task import ecs_background_task
Expand Down Expand Up @@ -301,3 +304,9 @@ def transform_and_load(

# finish
print("transform and load is done")


@etl_app.command(name="opportunity-load")
def load_opportunity_data() -> None:
"""Grabs data from s3 bucket and loads it into opportunity tables."""
extract_copy_opportunity_data()
6 changes: 6 additions & 0 deletions analytics/src/analytics/integrations/extracts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
We use this package to load opportunity data from s3.
It extracts CSV files from S3 bucket and loads the records into respective
opportunity tables.
"""
101 changes: 101 additions & 0 deletions analytics/src/analytics/integrations/extracts/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Holds all constant values."""

from enum import StrEnum


class OpportunityTables(StrEnum):
"""Opportunity tables that are copied over to analytics database."""

LK_OPPORTUNITY_STATUS = "lk_opportunity_status"
LK_OPPORTUNITY_CATEGORY = "lk_opportunity_category"
OPPORTUNITY = "opportunity"
OPPORTUNITY_SUMMARY = "opportunity_summary"
CURRENT_OPPORTUNITY_SUMMARY = "current_opportunity_summary"


LK_OPPORTUNITY_STATUS_COLS = (
"OPPORTUNITY_STATUS_ID",
"DESCRIPTION",
"CREATED_AT",
"UPDATED_AT",
)

LK_OPPORTUNITY_CATEGORY_COLS = (
"OPPORTUNITY_CATEGORY_ID",
"DESCRIPTION",
"CREATED_AT",
"UPDATED_AT",
)
OPPORTUNITY_COLS = (
"OPPORTUNITY_ID",
"OPPORTUNITY_NUMBER",
"OPPORTUNITY_TITLE",
"AGENCY_CODE",
"OPPORTUNITY_CATEGORY_ID",
"CATEGORY_EXPLANATION",
"IS_DRAFT",
"REVISION_NUMBER",
"MODIFIED_COMMENTS",
"PUBLISHER_USER_ID",
"PUBLISHER_PROFILE_ID",
"CREATED_AT",
"UPDATED_AT",
)
OPOORTUNITY_SUMMARY_COLS = (
"OPPORTUNITY_SUMMARY_ID",
"OPPORTUNITY_ID",
"SUMMARY_DESCRIPTION",
"IS_COST_SHARING",
"IS_FORECAST",
"POST_DATE",
"CLOSE_DATE",
"CLOSE_DATE_DESCRIPTION",
"ARCHIVE_DATE",
"UNARCHIVE_DATE",
"EXPECTED_NUMBER_OF_AWARDS",
"ESTIMATED_TOTAL_PROGRAM_FUNDING",
"AWARD_FLOOR",
"AWARD_CEILING",
"ADDITIONAL_INFO_URL",
"ADDITIONAL_INFO_URL_DESCRIPTION",
"FORECASTED_POST_DATE",
"FORECASTED_CLOSE_DATE",
"FORECASTED_CLOSE_DATE_DESCRIPTION",
"FORECASTED_AWARD_DATE",
"FORECASTED_PROJECT_START_DATE",
"FISCAL_YEAR",
"REVISION_NUMBER",
"MODIFICATION_COMMENTS",
"FUNDING_CATEGORY_DESCRIPTION",
"APPLICANT_ELIGIBILITY_DESCRIPTION",
"AGENCY_CODE",
"AGENCY_NAME",
"AGENCY_PHONE_NUMBER",
"AGENCY_CONTACT_DESCRIPTION",
"AGENCY_EMAIL_ADDRESS",
"AGENCY_EMAIL_ADDRESS_DESCRIPTION",
"IS_DELETED",
"CAN_SEND_MAIL",
"PUBLISHER_PROFILE_ID",
"PUBLISHER_USER_ID",
"UPDATED_BY",
"CREATED_BY",
"CREATED_AT",
"UPDATED_AT",
"VERSION_NUMBER",
)
CURRENT_OPPORTUNITY_SUMMARY_COLS = (
"OPPORTUNITY_ID",
"OPPORTUNITY_SUMMARY_ID",
"OPPORTUNITY_STATUS_ID",
"CREATED_AT",
"UPDATED_AT",
)

MAP_TABLES_TO_COLS: dict[OpportunityTables, tuple[str, ...]] = {
OpportunityTables.LK_OPPORTUNITY_STATUS: LK_OPPORTUNITY_STATUS_COLS,
OpportunityTables.LK_OPPORTUNITY_CATEGORY: LK_OPPORTUNITY_CATEGORY_COLS,
OpportunityTables.OPPORTUNITY: OPPORTUNITY_COLS,
OpportunityTables.OPPORTUNITY_SUMMARY: OPOORTUNITY_SUMMARY_COLS,
OpportunityTables.CURRENT_OPPORTUNITY_SUMMARY: CURRENT_OPPORTUNITY_SUMMARY_COLS,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# pylint: disable=invalid-name, line-too-long
"""Loads opportunity tables with opportunity data from S3."""

import logging
import os
from contextlib import ExitStack

import smart_open # type: ignore[import]
from pydantic import Field
from pydantic_settings import BaseSettings
from sqlalchemy import Connection

from analytics.integrations.etldb.etldb import EtlDb
from analytics.integrations.extracts.constants import (
MAP_TABLES_TO_COLS,
OpportunityTables,
)

logger = logging.getLogger(__name__)


class LoadOpportunityDataFileConfig(BaseSettings):
"""Configure S3 properties for opportunity data."""

load_opportunity_data_file_path: str | None = Field(
default=None,
alias="LOAD_OPPORTUNITY_DATA_FILE_PATH",
)


def extract_copy_opportunity_data() -> None:
"""Instantiate Etldb class and call helper funcs to truncate and insert data in one txn."""
etldb_conn = EtlDb()

with etldb_conn.connection() as conn, conn.begin():
_trancate_opportunity_table_records(conn)

_fetch_insert_opportunity_data(conn)

logger.info("Extract opportunity data completed successfully")


def _trancate_opportunity_table_records(conn: Connection) -> None:
"""Truncate existing records from all tables."""
cursor = conn.connection.cursor()
schema = os.environ["DB_SCHEMA"]
for table in OpportunityTables:
stmt_trct = f"TRUNCATE TABLE {schema}.{table} CASCADE"
cursor.execute(stmt_trct)
logger.info("Truncated all records from all tables")


def _fetch_insert_opportunity_data(conn: Connection) -> None:
"""Streamlines opportunity tables from S3 and insert into the database."""
s3_config = LoadOpportunityDataFileConfig()

cursor = conn.connection.cursor()
for table in OpportunityTables:
logger.info("Copying data for table: %s", table)

columns = MAP_TABLES_TO_COLS.get(table, ())
query = f"""
COPY {f"{os.getenv("DB_SCHEMA")}.{table} ({', '.join(columns)})"}
FROM STDIN WITH (FORMAT CSV, DELIMITER ',', QUOTE '"', HEADER)
"""

with ExitStack() as stack:
file = stack.enter_context(
smart_open.open(
f"{s3_config.load_opportunity_data_file_path}/{table}.csv",
"r",
),
)
copy = stack.enter_context(cursor.copy(query))

while data := file.read():
copy.write(data)

logger.info("Successfully loaded data for table: %s", table)
18 changes: 18 additions & 0 deletions analytics/src/analytics/integrations/extracts/s3_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Configuration for S3."""

import boto3
import botocore


def get_s3_client(
session: boto3.Session | None = None,
boto_config: botocore.config.Config | None = None,
) -> botocore.client.BaseClient:
"""Return an S3 client."""
if boto_config is None:
boto_config = botocore.config.Config(signature_version="s3v4")

if session is not None:
return session.client("s3", config=boto_config)

return boto3.client("s3", config=boto_config)
Loading

0 comments on commit b429083

Please sign in to comment.