Skip to content

Commit

Permalink
Merge with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jterry64 committed Aug 6, 2024
2 parents ded851b + 0053319 commit 529da34
Show file tree
Hide file tree
Showing 42 changed files with 666 additions and 468 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
line_length = 88
multi_line_output = 3
include_trailing_comma = True
known_third_party = _pytest,aenum,affine,aiohttp,alembic,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
known_third_party = _pytest,aenum,affine,aiohttp,alembic,asgi_lifespan,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ COPY Pipfile Pipfile
COPY Pipfile.lock Pipfile.lock

RUN if [ "$ENV" = "dev" ] || [ "$ENV" = "test" ]; then \
echo "Install all dependencies" \
&& pipenv install --system --deploy --ignore-pipfile --dev; \
echo "Install all dependencies" \
&& pipenv install --system --deploy --ignore-pipfile --dev; \
else \
echo "Install production dependencies only" \
&& pipenv install --system --deploy; \
echo "Install production dependencies only" \
&& pipenv install --system --deploy; \
fi

COPY ./app /app/app
Expand Down
9 changes: 5 additions & 4 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
asgi_lifespan = "*"
black = "*"
detect-secrets = "*"
docker = "*"
Expand All @@ -17,7 +18,7 @@ openapi_spec_validator = "*"
pandas = "<2.2" # Needed by pixetl in batch script test
pre-commit = "*"
pytest = "*"
pytest-asyncio = "<0.19"
pytest-asyncio = "*"
pytest-cov = "*"
pytest-timeout = "*"
rasterio = "*"
Expand All @@ -35,11 +36,11 @@ asyncpg = "*"
boto3 = "*"
botocore = "*"
email-validator = "*"
fastapi = "<0.68"
fastapi = "*"
geoalchemy2 = "<0.12"
geojson = "*"
gino = "*"
gino_starlette = "==0.1.2"
gino_starlette = "*"
google-cloud-storage = "*"
httpcore = "*"
httpx = "*"
Expand All @@ -57,7 +58,7 @@ python-multipart = "*"
shapely = "*"
sqlalchemy = "<1.4"
sqlalchemy-utils = "*"
starlette = "<0.15"
starlette = "*"
typer = "*"
uvicorn = {version = "*", extras = ["standard"]}

Expand Down
212 changes: 164 additions & 48 deletions Pipfile.lock

Large diffs are not rendered by default.

65 changes: 32 additions & 33 deletions app/application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from asyncio import Future
from contextlib import asynccontextmanager
from contextvars import ContextVar
from typing import Optional

Expand All @@ -11,6 +12,10 @@
DATABASE_CONFIG,
SQL_REQUEST_TIMEOUT,
WRITE_DATABASE_CONFIG,
WRITER_MIN_POOL_SIZE,
WRITER_MAX_POOL_SIZE,
READER_MIN_POOL_SIZE,
READER_MAX_POOL_SIZE,
)

# Set the current engine using a ContextVar to assure
Expand All @@ -33,32 +38,14 @@ def bind(self):
logger.debug(f"Set bind to {bind.repr(color=True)}")
return bind
except LookupError:
# not in a request
logger.debug("Not in a request, using default bind")
return self._bind
logger.debug("Not in a request, using READ engine")
return READ_ENGINE

@bind.setter
def bind(self, val):
self._bind = val


app = FastAPI(title="GFW Data API", redoc_url="/")

# Create Contextual Database, using default connection and pool size = 0
# We will bind actual connection pools based on path operation using middleware
# This allows us to query load-balanced Aurora read replicas for read-only operations
# and Aurora Write Node for write operations
db = ContextualGino(
app=app,
host=DATABASE_CONFIG.host,
port=DATABASE_CONFIG.port,
user=DATABASE_CONFIG.username,
password=DATABASE_CONFIG.password,
database=DATABASE_CONFIG.database,
pool_min_size=0,
)


class ContextEngine(object):
def __init__(self, method):
self.method = method
Expand Down Expand Up @@ -91,35 +78,30 @@ async def get_engine(method: str) -> GinoEngine:
return engine


@app.on_event("startup")
async def startup_event():
"""Initializing the database connections on startup."""

@asynccontextmanager
async def lifespan(app: FastAPI):
global WRITE_ENGINE
global READ_ENGINE

WRITE_ENGINE = await create_engine(
WRITE_DATABASE_CONFIG.url, max_size=5, min_size=1
WRITE_DATABASE_CONFIG.url,
max_size=WRITER_MAX_POOL_SIZE,
min_size=WRITER_MIN_POOL_SIZE,
)
logger.info(
f"Database connection pool for write operation created: {WRITE_ENGINE.repr(color=True)}"
)
READ_ENGINE = await create_engine(
DATABASE_CONFIG.url,
max_size=10,
min_size=5,
max_size=READER_MAX_POOL_SIZE,
min_size=READER_MIN_POOL_SIZE,
command_timeout=SQL_REQUEST_TIMEOUT,
)
logger.info(
f"Database connection pool for read operation created: {READ_ENGINE.repr(color=True)}"
)


@app.on_event("shutdown")
async def shutdown_event():
"""Closing the database connections on shutdown."""
global WRITE_ENGINE
global READ_ENGINE
yield

if WRITE_ENGINE:
logger.info(
Expand All @@ -137,3 +119,20 @@ async def shutdown_event():
logger.info(
f"Closed database connection for read operations {READ_ENGINE.repr(color=True)}"
)


app: FastAPI = FastAPI(title="GFW Data API", redoc_url="/", lifespan=lifespan)

# Create Contextual Database, using default connection and pool size = 0
# We will bind actual connection pools based on path operation using middleware
# This allows us to query load-balanced Aurora read replicas for read-only operations
# and Aurora Write Node for write operations
db = ContextualGino(
app=app,
host=DATABASE_CONFIG.host,
port=DATABASE_CONFIG.port,
user=DATABASE_CONFIG.username,
password=DATABASE_CONFIG.password,
database=DATABASE_CONFIG.database,
pool_min_size=0,
)
3 changes: 1 addition & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import sys
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
Expand Down Expand Up @@ -77,7 +76,7 @@ async def rve_error_handler(
) -> ORJSONResponse:
"""Use JSEND protocol for validation errors."""
return ORJSONResponse(
status_code=422, content={"status": "failed", "message": json.loads(exc.json())}
status_code=422, content={"status": "failed", "message": exc.errors()}
)


Expand Down
5 changes: 5 additions & 0 deletions app/models/pydantic/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ class StaticVectorTileCacheCreationOptions(TileCacheBaseModel):
"for vector tile caches. `source` and `source-layer` attributes must use `dataset` name."
"Styling rules will be used in autogenerated root.json and preview.",
)
feature_filter: Optional[Dict[str, Any]] = Field(
None,
description="Optional tippecanoe feature filter(s). Uses the syntax of "
"[Mapbox legacy filters](https://docs.mapbox.com/style-spec/reference/other/#other-filters)"
)


class StaticVectorFileCreationOptions(StrictBaseModel):
Expand Down
2 changes: 1 addition & 1 deletion app/models/pydantic/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class GDALCOGJob(Job):
vcpus = 8
memory = 64000
num_processes = 8
attempts = 10
attempts = 2
attempt_duration_seconds = int(DEFAULT_JOB_DURATION * 1.5)


Expand Down
13 changes: 5 additions & 8 deletions app/routes/analysis/analysis.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""Run analysis on registered datasets."""

from typing import Any, Dict, List, Optional
from uuid import UUID

from fastapi import APIRouter, Path, Query
from fastapi import APIRouter, Depends, Path, Query
from fastapi.exceptions import HTTPException
from fastapi.logger import logger

# from fastapi.openapi.models import APIKey
from fastapi.openapi.models import APIKey
from fastapi.responses import ORJSONResponse

# from ...authentication.api_keys import get_api_key
from ...authentication.api_keys import get_api_key
from ...models.enum.analysis import RasterLayer
from ...models.enum.geostore import GeostoreOrigin
from ...models.pydantic.analysis import ZonalAnalysisRequestIn
Expand Down Expand Up @@ -52,7 +50,7 @@ async def zonal_statistics_get(
description="Must be either year or YYYY-MM-DD date format.",
regex=DATE_REGEX,
),
# api_key: APIKey = Depends(get_api_key),
api_key: APIKey = Depends(get_api_key),
):
"""Calculate zonal statistics on any registered raster layers in a
geostore."""
Expand Down Expand Up @@ -82,8 +80,7 @@ async def zonal_statistics_get(
deprecated=True,
)
async def zonal_statistics_post(
request: ZonalAnalysisRequestIn,
# api_key: APIKey = Depends(get_api_key)
request: ZonalAnalysisRequestIn, api_key: APIKey = Depends(get_api_key)
):
return await _zonal_statistics(
request.geometry,
Expand Down
15 changes: 6 additions & 9 deletions app/routes/datasets/queries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Explore data entries for a given dataset version using standard SQL."""

import csv
import json
import re
Expand All @@ -18,8 +17,7 @@
from fastapi import Response as FastApiResponse
from fastapi.encoders import jsonable_encoder
from fastapi.logger import logger

# from fastapi.openapi.models import APIKey
from fastapi.openapi.models import APIKey
from fastapi.responses import ORJSONResponse, RedirectResponse
from pglast import printers # noqa
from pglast import Node, parse_sql
Expand All @@ -28,9 +26,8 @@
from pydantic.tools import parse_obj_as

from ...application import db
from ...authentication.api_keys import get_api_key
from ...authentication.token import is_gfwpro_admin_for_query

# from ...authentication.api_keys import get_api_key
from ...crud import assets
from ...models.enum.assets import AssetType
from ...models.enum.creation_options import Delimiters
Expand Down Expand Up @@ -148,7 +145,7 @@ async def query_dataset_json(
GeostoreOrigin.gfw, description="Service to search first for geostore."
),
is_authorized: bool = Depends(is_gfwpro_admin_for_query),
# api_key: APIKey = Depends(get_api_key),
api_key: APIKey = Depends(get_api_key),
):
"""Execute a READ-ONLY SQL query on the given dataset version (if
implemented) and return response in JSON format.
Expand Down Expand Up @@ -212,7 +209,7 @@ async def query_dataset_csv(
Delimiters.comma, description="Delimiter to use for CSV file."
),
is_authorized: bool = Depends(is_gfwpro_admin_for_query),
# api_key: APIKey = Depends(get_api_key),
api_key: APIKey = Depends(get_api_key),
):
"""Execute a READ-ONLY SQL query on the given dataset version (if
implemented) and return response in CSV format.
Expand Down Expand Up @@ -275,7 +272,7 @@ async def query_dataset_json_post(
dataset_version: Tuple[str, str] = Depends(dataset_version_dependency),
request: QueryRequestIn,
is_authorized: bool = Depends(is_gfwpro_admin_for_query),
# api_key: APIKey = Depends(get_api_key),
api_key: APIKey = Depends(get_api_key),
):
"""Execute a READ-ONLY SQL query on the given dataset version (if
implemented)."""
Expand Down Expand Up @@ -305,7 +302,7 @@ async def query_dataset_csv_post(
dataset_version: Tuple[str, str] = Depends(dataset_version_dependency),
request: CsvQueryRequestIn,
is_authorized: bool = Depends(is_gfwpro_admin_for_query),
# api_key: APIKey = Depends(get_api_key),
api_key: APIKey = Depends(get_api_key),
):
"""Execute a READ-ONLY SQL query on the given dataset version (if
implemented)."""
Expand Down
18 changes: 4 additions & 14 deletions app/settings/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,6 @@
from ..models.enum.pixetl import ResamplingMethod
from ..models.pydantic.database import DatabaseURL

#
# def _remove_revision(arn: str) -> str:
# """Remove revision number from batch job description arn."""
# arn_items = arn.split(":")
# revision = arn_items.pop()
# try:
# # Check if revision is a number
# int(revision)
# return ":".join(arn_items)
# except (ValueError, TypeError):
# # if not, this means that there was no revision number in first place and we can use the input
# return arn


# Read .env file, if exists
p: Path = Path(__file__).parents[2] / ".env"
config: Config = Config(p if p.exists() else None)
Expand Down Expand Up @@ -70,6 +56,8 @@
READER_HOST: str = config("DB_HOST_RO", cast=str, default=DB_READER_SECRET["host"])
READER_PORT: int = config("DB_PORT_RO", cast=int, default=DB_READER_SECRET["port"])
READER_DBNAME = config("DATABASE_RO", cast=str, default=DB_READER_SECRET["dbname"])
READER_MIN_POOL_SIZE: int = config("READER_MIN_POOL_SIZE", cast=int, default=5)
READER_MAX_POOL_SIZE: int = config("READER_MAX_POOL_SIZE", cast=int, default=10)

WRITER_USERNAME: Optional[str] = config(
"DB_USER", cast=str, default=DB_WRITER_SECRET["username"]
Expand All @@ -80,6 +68,8 @@
WRITER_HOST: str = config("DB_HOST", cast=str, default=DB_WRITER_SECRET["host"])
WRITER_PORT: int = config("DB_PORT", cast=int, default=DB_WRITER_SECRET["port"])
WRITER_DBNAME = config("DATABASE", cast=str, default=DB_WRITER_SECRET["dbname"])
WRITER_MIN_POOL_SIZE: int = config("WRITER_MIN_POOL_SIZE", cast=int, default=1)
WRITER_MAX_POOL_SIZE: int = config("WRITER_MAX_POOL_SIZE", cast=int, default=5)

if ENV == "dev":
NAME_SUFFIX = config("NAME_SUFFIX", cast=str)
Expand Down
3 changes: 3 additions & 0 deletions app/tasks/cog_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from app.models.pydantic.change_log import ChangeLog
from app.models.pydantic.creation_options import COGCreationOptions
from app.models.pydantic.jobs import GDALCOGJob, Job
from app.settings.globals import DATA_LAKE_BUCKET
from app.tasks import callback_constructor
from app.tasks.batch import execute
from app.tasks.raster_tile_set_assets.utils import JOB_ENV
Expand Down Expand Up @@ -90,6 +91,8 @@ async def create_cogify_job(
dataset,
"-I",
creation_options.implementation,
"--prefix",
f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/raster/{srid}/cog",
]

if creation_options.export_to_gee:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from ...errors import RecordNotFoundError


async def raster_tile_cache_asset(
dataset: str,
version: str,
Expand Down
Loading

0 comments on commit 529da34

Please sign in to comment.