Skip to content

Commit

Permalink
Merge branch 'develop' into gdal_3_9_0
Browse files Browse the repository at this point in the history
  • Loading branch information
dmannarino authored Jun 22, 2024
2 parents c474a9a + 070e63e commit 68e6432
Show file tree
Hide file tree
Showing 21 changed files with 461 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/terraform_destroy_on_delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [delete]

jobs:
build:
if: contains(github.event.ref_type, 'branch') && (! github.event.ref == 'master') && (! github.event.ref == 'develop')
if: github.event.ref_type == 'branch' && (github.event.ref != 'refs/heads/master') && (github.event.ref != 'refs/heads/develop')
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
Expand Down
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
line_length = 88
multi_line_output = 3
include_trailing_comma = True
known_third_party = _pytest,aenum,affine,aiohttp,alembic,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
known_third_party = _pytest,aenum,affine,aiohttp,alembic,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer
9 changes: 2 additions & 7 deletions app/crud/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@

from asyncpg import UniqueViolationError
from fastapi.encoders import jsonable_encoder
from sqlalchemy.sql import and_
from sqlalchemy import func

from app.crud.metadata import (
create_asset_metadata,
get_asset_metadata,
update_asset_metadata,
)
from sqlalchemy import func
from async_lru import alru_cache

from ..errors import RecordAlreadyExistsError, RecordNotFoundError
from ..models.enum.assets import AssetType
from ..models.orm.asset_metadata import AssetMetadata as ORMAssetMetadata
from ..models.orm.assets import Asset as ORMAsset
from ..models.orm.versions import Version as ORMVersion
from ..models.pydantic.creation_options import CreationOptions, creation_option_factory
from ..models.pydantic.asset_metadata import RasterTileSetMetadataOut
from . import update_data, versions


Expand Down Expand Up @@ -211,7 +206,7 @@ async def create_asset(dataset, version, **data) -> ORMAsset:
except UniqueViolationError:
raise RecordAlreadyExistsError(
f"Cannot create asset of type {data['asset_type']}. "
f"Asset uri must be unique. An asset with uri {data['asset_uri']} already exists"
f"Asset URI must be unique. An asset with URI {data['asset_uri']} already exists"
)

if metadata_data:
Expand Down
2 changes: 2 additions & 0 deletions app/models/enum/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AssetType(str, Enum):
csv = "csv"
tsv = "tsv"
grid_1x1 = "1x1 grid"
cog = "COG"
# esri_map_service = "ESRI Map Service"
# esri_feature_service = "ESRI Feature Service"
# esri_image_service = "ESRI Image Service"
Expand Down Expand Up @@ -63,6 +64,7 @@ def is_single_file_asset(asset_type: str) -> bool:
AssetType.csv,
AssetType.tsv,
AssetType.grid_1x1,
AssetType.cog,
]


Expand Down
11 changes: 11 additions & 0 deletions app/models/enum/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ class ColorMapType(str, Enum):
date_conf_intensity_multi_16 = "date_conf_intensity_multi_16"
year_intensity = "year_intensity"
value_intensity = "value_intensity"


class TileBlockSize(int, Enum):
two_fifty_six = 256
five_twelve = 512
ten_twenty_four = 1024


class Srid(str, Enum):
wgs84 = "epsg:4326"
web_mercator = "epsg:3857"
9 changes: 9 additions & 0 deletions app/models/pydantic/asset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

from ...models.orm.assets import Asset as ORMAsset
from ..enum.assets import AssetType
from ..enum.creation_options import TileBlockSize
from ..enum.pg_types import PGType
from ..enum.pixetl import ResamplingMethod
from .base import BaseORMRecord, StrictBaseModel
from .responses import Response

Expand Down Expand Up @@ -75,6 +77,11 @@ class RasterTileSetMetadata(AssetBase):
resolution: Optional[int]


class COGMetadata(AssetBase):
block_size: TileBlockSize
resampling: ResamplingMethod


class RasterTileSetMetadataUpdate(AssetBase):
resolution: int

Expand Down Expand Up @@ -186,6 +193,7 @@ def asset_metadata_factory(asset: ORMAsset) -> AssetMetadata:
AssetType.grid_1x1: VectorFileMetadata,
AssetType.shapefile: VectorFileMetadata,
AssetType.geopackage: VectorFileMetadata,
AssetType.cog: COGMetadata,
}

if asset.asset_type in metadata_factory.keys():
Expand Down Expand Up @@ -235,5 +243,6 @@ class FieldsMetadataResponse(Response):
class FieldMetadataResponse(Response):
data: FieldMetadataOut


class RasterBandsMetadataResponse(Response):
data: List[RasterBandMetadata]
36 changes: 34 additions & 2 deletions app/models/pydantic/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
PartitionType,
RasterDrivers,
TableDrivers,
TileBlockSize,
TileStrategy,
VectorDrivers,
)
Expand Down Expand Up @@ -339,6 +340,36 @@ class RasterTileCacheCreationOptions(TileCacheBaseModel):
)


class COGCreationOptions(StrictBaseModel):
implementation: str = Field(
"default",
description="Name space to use for COG. "
"This will be part of the URI and will "
"allow creation of multiple COGs per version.",
)
source_asset_id: str = Field(
...,
description="Raster tile set asset ID to use as source. "
"Must be an asset of the same version",
)
resampling: ResamplingMethod = Field(
ResamplingMethod.average,
description="Resampling method used to downsample overviews",
)
block_size: Optional[TileBlockSize] = Field(
512,
description="Block size to tile COG with.",
)
compute_stats: bool = False
export_to_gee: bool = Field(
False,
description="Option to export COG to a Google Cloud Storage and create"
" a COG-backed asset on Google Earth Engine (GEE). The asset will be created"
" under the project `forma-250` with the asset ID `{dataset}/{implementation}. "
"Versioning is currently not supported due to GEE storage constraints.",
)


class DynamicVectorTileCacheCreationOptions(TileCacheBaseModel):
field_attributes: Optional[List[Dict[str, Any]]] = Field(
None,
Expand Down Expand Up @@ -382,8 +413,7 @@ class StaticVectorFileCreationOptions(StrictBaseModel):

class StaticVector1x1CreationOptions(StaticVectorFileCreationOptions):
include_tile_id: Optional[bool] = Field(
False,
description="Whether or not to include the tile_id of each feature"
False, description="Whether or not to include the tile_id of each feature"
)


Expand All @@ -395,6 +425,7 @@ class StaticVector1x1CreationOptions(StaticVectorFileCreationOptions):

OtherCreationOptions = Union[
TableAssetCreationOptions,
COGCreationOptions,
RasterTileCacheCreationOptions,
StaticVectorTileCacheCreationOptions,
StaticVectorFileCreationOptions,
Expand Down Expand Up @@ -424,6 +455,7 @@ class CreationOptionsResponse(Response):
AssetType.shapefile: StaticVectorFileCreationOptions,
AssetType.geopackage: StaticVectorFileCreationOptions,
AssetType.raster_tile_set: RasterTileSetAssetCreationOptions,
AssetType.cog: COGCreationOptions,
AssetType.raster_tile_cache: RasterTileCacheCreationOptions,
AssetType.database_table: TableAssetCreationOptions,
}
Expand Down
17 changes: 16 additions & 1 deletion app/models/pydantic/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ class PostgresqlClientJob(Job):

class GdalPythonImportJob(Job):
"""Use for write operations to PostgreSQL which require GDAL/Ogr2Ogr
drivers. NOTE: JOB MUST BE SAFE TO RETRY!"""
drivers.
NOTE: JOB MUST BE SAFE TO RETRY!
"""

job_queue = AURORA_JOB_QUEUE
job_definition = GDAL_PYTHON_JOB_DEFINITION
Expand Down Expand Up @@ -134,6 +137,18 @@ class PixETLJob(Job):
attempt_duration_seconds = int(DEFAULT_JOB_DURATION * 1.5)


class GDALCOGJob(Job):
"""Use for creating COG files using GDAL Python docker in PixETL queue."""

job_queue = PIXETL_JOB_QUEUE
job_definition = GDAL_PYTHON_JOB_DEFINITION
vcpus = 8
memory = 64000
num_processes = 8
attempts = 10
attempt_duration_seconds = int(DEFAULT_JOB_DURATION * 1.5)


class GDALDEMJob(Job):
"""Use for applying color maps to raster tiles with gdaldem."""

Expand Down
1 change: 1 addition & 0 deletions app/routes/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def verify_asset_dependencies(dataset, version, asset_type):
AssetType.raster_tile_set,
AssetType.geo_database_table,
],
AssetType.cog: AssetType.raster_tile_set,
}
try:
parent_type = asset_dependencies[asset_type]
Expand Down
1 change: 0 additions & 1 deletion app/routes/datasets/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from app.settings.globals import API_URL

from ...authentication.token import is_admin
from ...crud import assets
from ...errors import RecordAlreadyExistsError
from ...models.orm.assets import Asset as ORMAsset
Expand Down
7 changes: 3 additions & 4 deletions app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from ..application import ContextEngine
from ..crud import assets as crud_assets
from ..crud import tasks as crud_tasks
from ..models.orm.assets import Asset as ORMAsset
from ..models.orm.tasks import Task as ORMTask
from ..models.orm.tasks import Task as ORMTask, Task
from ..models.pydantic.change_log import ChangeLog
from ..settings.globals import (
API_URL,
Expand Down Expand Up @@ -59,14 +58,14 @@ async def update_asset_status(asset_id, status):

def callback_constructor(
asset_id: UUID,
) -> Callback:
) -> Callable[[UUID, ChangeLog], Coroutine[UUID, ChangeLog, Task]]:
"""Callback constructor.
Assign asset_id in the context of the constructor once. Afterwards
you will only need to pass the ChangeLog object.
"""

async def callback(task_id: UUID, change_log: ChangeLog) -> ORMAsset:
async def callback(task_id: UUID, change_log: ChangeLog) -> ORMTask:
async with ContextEngine("WRITE"):
task: ORMTask = await crud_tasks.create_task(
task_id, asset_id=asset_id, change_log=[change_log.dict(by_alias=True)]
Expand Down
2 changes: 2 additions & 0 deletions app/tasks/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ..models.enum.sources import SourceType
from ..models.pydantic.assets import AssetType
from ..models.pydantic.change_log import ChangeLog
from .cog_assets import cog_asset
from .dynamic_vector_tile_cache_assets import dynamic_vector_tile_cache_asset
from .raster_tile_cache_assets import raster_tile_cache_asset
from .raster_tile_set_assets import raster_tile_set_asset
Expand All @@ -30,6 +31,7 @@
AssetType.grid_1x1: static_vector_1x1_asset,
# AssetType.vector_tile_cache: vector_tile_cache_asset,
AssetType.raster_tile_cache: raster_tile_cache_asset,
AssetType.cog: cog_asset,
# AssetType.dynamic_raster_tile_cache: dynamic_raster_tile_cache_asset,
AssetType.raster_tile_set: raster_tile_set_asset,
}.items()
Expand Down
111 changes: 111 additions & 0 deletions app/tasks/cog_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Any, Callable, Coroutine, Dict
from uuid import UUID

from app.crud.assets import get_asset
from app.models.enum.assets import AssetType
from app.models.enum.pixetl import ResamplingMethod
from app.models.orm.assets import Asset as ORMAsset
from app.models.orm.tasks import Task
from app.models.pydantic.change_log import ChangeLog
from app.models.pydantic.creation_options import COGCreationOptions
from app.models.pydantic.jobs import GDALCOGJob, Job
from app.tasks import callback_constructor
from app.tasks.batch import execute
from app.tasks.raster_tile_set_assets.utils import JOB_ENV
from app.tasks.utils import sanitize_batch_job_name
from app.utils.path import get_asset_uri, infer_srid_from_grid


async def cog_asset(
dataset: str,
version: str,
asset_id: UUID,
input_data: Dict[str, Any],
) -> ChangeLog:
"""Create a COG asset from a raster tile set asset."""

# Create the Batch job to generate the COG
creation_options: COGCreationOptions = COGCreationOptions(
**input_data["creation_options"]
)

cog_job: Job = await create_cogify_job(
dataset,
version,
creation_options,
callback_constructor(asset_id),
)

log: ChangeLog = await execute([cog_job])
return log


async def create_cogify_job(
dataset: str,
version: str,
creation_options: COGCreationOptions,
callback: Callable[[UUID, ChangeLog], Coroutine[UUID, ChangeLog, Task]],
) -> Job:
"""Create a Batch job to coalesce a raster tile set into a COG.
For the moment only suitable for EPSG:3857 raster tile sets.
"""
source_asset: ORMAsset = await get_asset(UUID(creation_options.source_asset_id))

srid = infer_srid_from_grid(source_asset.creation_options["grid"])
asset_uri = get_asset_uri(
dataset, version, AssetType.raster_tile_set, source_asset.creation_options, srid
)

# get folder of tiles
source_uri = "/".join(asset_uri.split("/")[:-1]) + "/"

# We want to wind up with "{dataset}/{version}/raster/{projection}/cog/{implementation}.tif"
target_asset_uri = get_asset_uri(
dataset,
version,
AssetType.cog,
creation_options.dict(by_alias=True),
srid,
)

# The GDAL utilities use "near" whereas rasterio/pixetl use "nearest"
resample_method = (
"near"
if creation_options.resampling == ResamplingMethod.nearest
else creation_options.resampling.value
)

command = [
"cogify.sh",
"-s",
source_uri,
"-T",
target_asset_uri,
"-r",
resample_method,
"--block_size",
creation_options.block_size.value,
"-d",
dataset,
"-I",
creation_options.implementation,
]

if creation_options.export_to_gee:
command += ["--export_to_gee"]

job_name: str = sanitize_batch_job_name(
f"COGify_{dataset}_{version}_{creation_options.implementation}"
)

kwargs = dict()

return GDALCOGJob(
dataset=dataset,
job_name=job_name,
command=command,
environment=JOB_ENV,
callback=callback,
**kwargs,
)
1 change: 1 addition & 0 deletions app/utils/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_asset_uri(
uri_constructor: Dict[str, str] = {
AssetType.dynamic_vector_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/dynamic/{{z}}/{{x}}/{{y}}.pbf",
AssetType.static_vector_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/{implementation}/{{z}}/{{x}}/{{y}}.pbf",
AssetType.cog: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/raster/{srid}/cog/{implementation}.tif",
AssetType.raster_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/{implementation}/{{z}}/{{x}}/{{y}}.png",
AssetType.shapefile: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/vector/{srid}/{dataset}_{version}.shp.zip",
AssetType.ndjson: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/vector/{srid}/{dataset}_{version}.ndjson",
Expand Down
Loading

0 comments on commit 68e6432

Please sign in to comment.