Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COG pipeline #534

Merged
merged 30 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
644c564
Add initial support for outputting to COG
dmannarino May 18, 2024
f808789
Minor type and import fixes
dmannarino May 18, 2024
933e66a
Fix missing UUID import
dmannarino May 18, 2024
0908290
Fix uri->URI in test
dmannarino May 20, 2024
493811e
Remove incorrect UUID fix (Dan fixed upstream)
dmannarino May 21, 2024
6eed8e8
Add block size, compute stats argument to COG
dmannarino May 29, 2024
9c66b72
Add batch script to convert tile sets to COGs
jterry64 May 30, 2024
7bfefbc
Merge branch 'output_cog_support' into feature/cogify_script
jterry64 May 30, 2024
42c606a
Add test
jterry64 May 30, 2024
cec7f55
Get test to pass
jterry64 Jun 3, 2024
252f95b
Temporarily change to EC2 compute environment
jterry64 Jun 4, 2024
146af38
Break up gdalwarp COG command to multiple shorter commands
jterry64 Jun 5, 2024
9c28c6e
Remove unnecessary COG creation options
jterry64 Jun 5, 2024
ac996db
Remove projection as an option
jterry64 Jun 5, 2024
322d622
Fix test
jterry64 Jun 5, 2024
dbd4f93
Use correct GTiff block size
jterry64 Jun 5, 2024
df7ec62
Allow for BigTiffs
jterry64 Jun 6, 2024
5e5dc52
Force BIGTIFF=YES
jterry64 Jun 6, 2024
3a8df84
Pass target prefix to cogify script
jterry64 Jun 6, 2024
e835e39
Use DEFLATE instead of LZW compression
jterry64 Jun 6, 2024
59c50e4
Pass target prefix to cogify script
jterry64 Jun 6, 2024
d43fb82
Pass GDAL_CACHEMAX
jterry64 Jun 7, 2024
7367db6
Fix config options
jterry64 Jun 7, 2024
1439afe
Fix test
jterry64 Jun 7, 2024
6631c88
Actually fix test
jterry64 Jun 7, 2024
f3d864b
Use 8 cores for COG job
jterry64 Jun 10, 2024
f389792
Merge branch 'develop' into feature/cogify_script
jterry64 Jun 11, 2024
845b502
Fix PR comments
jterry64 Jun 11, 2024
2bab226
Merge branch 'feature/cogify_script' of https://github.com/wri/gfw-da…
jterry64 Jun 11, 2024
8e8c180
Merge branch 'develop' into feature/cogify_script
jterry64 Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions app/crud/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@

from asyncpg import UniqueViolationError
from fastapi.encoders import jsonable_encoder
from sqlalchemy.sql import and_
from sqlalchemy import func

from app.crud.metadata import (
create_asset_metadata,
get_asset_metadata,
update_asset_metadata,
)
from sqlalchemy import func
from async_lru import alru_cache

from ..errors import RecordAlreadyExistsError, RecordNotFoundError
from ..models.enum.assets import AssetType
from ..models.orm.asset_metadata import AssetMetadata as ORMAssetMetadata
from ..models.orm.assets import Asset as ORMAsset
from ..models.orm.versions import Version as ORMVersion
from ..models.pydantic.creation_options import CreationOptions, creation_option_factory
from ..models.pydantic.asset_metadata import RasterTileSetMetadataOut
from . import update_data, versions


Expand Down Expand Up @@ -211,7 +206,7 @@ async def create_asset(dataset, version, **data) -> ORMAsset:
except UniqueViolationError:
raise RecordAlreadyExistsError(
f"Cannot create asset of type {data['asset_type']}. "
f"Asset uri must be unique. An asset with uri {data['asset_uri']} already exists"
f"Asset URI must be unique. An asset with URI {data['asset_uri']} already exists"
)

if metadata_data:
Expand Down
2 changes: 2 additions & 0 deletions app/models/enum/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AssetType(str, Enum):
csv = "csv"
tsv = "tsv"
grid_1x1 = "1x1 grid"
cog = "COG"
# esri_map_service = "ESRI Map Service"
# esri_feature_service = "ESRI Feature Service"
# esri_image_service = "ESRI Image Service"
Expand Down Expand Up @@ -63,6 +64,7 @@ def is_single_file_asset(asset_type: str) -> bool:
AssetType.csv,
AssetType.tsv,
AssetType.grid_1x1,
AssetType.cog,
]


Expand Down
11 changes: 11 additions & 0 deletions app/models/enum/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ class ColorMapType(str, Enum):
date_conf_intensity_multi_16 = "date_conf_intensity_multi_16"
year_intensity = "year_intensity"
value_intensity = "value_intensity"


class TileBlockSize(int, Enum):
two_fifty_six = 256
five_twelve = 512
ten_twenty_four = 1024


class Srid(str, Enum):
wgs84 = "epsg:4326"
web_mercator = "epsg:3857"
9 changes: 9 additions & 0 deletions app/models/pydantic/asset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

from ...models.orm.assets import Asset as ORMAsset
from ..enum.assets import AssetType
from ..enum.creation_options import TileBlockSize
from ..enum.pg_types import PGType
from ..enum.pixetl import ResamplingMethod
from .base import BaseORMRecord, StrictBaseModel
from .responses import Response

Expand Down Expand Up @@ -75,6 +77,11 @@ class RasterTileSetMetadata(AssetBase):
resolution: Optional[int]


class COGMetadata(AssetBase):
block_size: TileBlockSize
resampling: ResamplingMethod


class RasterTileSetMetadataUpdate(AssetBase):
resolution: int

Expand Down Expand Up @@ -186,6 +193,7 @@ def asset_metadata_factory(asset: ORMAsset) -> AssetMetadata:
AssetType.grid_1x1: VectorFileMetadata,
AssetType.shapefile: VectorFileMetadata,
AssetType.geopackage: VectorFileMetadata,
AssetType.cog: COGMetadata,
}

if asset.asset_type in metadata_factory.keys():
Expand Down Expand Up @@ -235,5 +243,6 @@ class FieldsMetadataResponse(Response):
class FieldMetadataResponse(Response):
data: FieldMetadataOut


class RasterBandsMetadataResponse(Response):
data: List[RasterBandMetadata]
26 changes: 24 additions & 2 deletions app/models/pydantic/creation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
PartitionType,
RasterDrivers,
TableDrivers,
TileBlockSize,
TileStrategy,
VectorDrivers,
)
Expand Down Expand Up @@ -339,6 +340,26 @@ class RasterTileCacheCreationOptions(TileCacheBaseModel):
)


class COGCreationOptions(StrictBaseModel):
implementation: str = Field(
"default",
description="Name space to use for COG. "
"This will be part of the URI and will "
"allow creation of multiple COGs per version.",
)
source_asset_id: str = Field(
...,
description="Raster tile set asset ID to use as source. "
"Must be an asset of the same version",
)
resampling: ResamplingMethod = Field(
ResamplingMethod.average,
description="Resampling method used to downsample overviews",
)
block_size: Optional[TileBlockSize] = 512
compute_stats: bool = False


class DynamicVectorTileCacheCreationOptions(TileCacheBaseModel):
field_attributes: Optional[List[Dict[str, Any]]] = Field(
None,
Expand Down Expand Up @@ -382,8 +403,7 @@ class StaticVectorFileCreationOptions(StrictBaseModel):

class StaticVector1x1CreationOptions(StaticVectorFileCreationOptions):
include_tile_id: Optional[bool] = Field(
False,
description="Whether or not to include the tile_id of each feature"
False, description="Whether or not to include the tile_id of each feature"
)


Expand All @@ -395,6 +415,7 @@ class StaticVector1x1CreationOptions(StaticVectorFileCreationOptions):

OtherCreationOptions = Union[
TableAssetCreationOptions,
COGCreationOptions,
RasterTileCacheCreationOptions,
StaticVectorTileCacheCreationOptions,
StaticVectorFileCreationOptions,
Expand Down Expand Up @@ -424,6 +445,7 @@ class CreationOptionsResponse(Response):
AssetType.shapefile: StaticVectorFileCreationOptions,
AssetType.geopackage: StaticVectorFileCreationOptions,
AssetType.raster_tile_set: RasterTileSetAssetCreationOptions,
AssetType.cog: COGCreationOptions,
AssetType.raster_tile_cache: RasterTileCacheCreationOptions,
AssetType.database_table: TableAssetCreationOptions,
}
Expand Down
17 changes: 16 additions & 1 deletion app/models/pydantic/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ class PostgresqlClientJob(Job):

class GdalPythonImportJob(Job):
"""Use for write operations to PostgreSQL which require GDAL/Ogr2Ogr
drivers. NOTE: JOB MUST BE SAFE TO RETRY!"""
drivers.

NOTE: JOB MUST BE SAFE TO RETRY!
"""

job_queue = AURORA_JOB_QUEUE
job_definition = GDAL_PYTHON_JOB_DEFINITION
Expand Down Expand Up @@ -134,6 +137,18 @@ class PixETLJob(Job):
attempt_duration_seconds = int(DEFAULT_JOB_DURATION * 1.5)


class GDALCOGJob(Job):
"""Use for creating COG files using GDAL Python docker in PixETL queue."""

job_queue = PIXETL_JOB_QUEUE
job_definition = GDAL_PYTHON_JOB_DEFINITION
vcpus = 8
memory = 64000
num_processes = 8
attempts = 10
attempt_duration_seconds = int(DEFAULT_JOB_DURATION * 1.5)


class GDALDEMJob(Job):
"""Use for applying color maps to raster tiles with gdaldem."""

Expand Down
1 change: 1 addition & 0 deletions app/routes/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def verify_asset_dependencies(dataset, version, asset_type):
AssetType.raster_tile_set,
AssetType.geo_database_table,
],
AssetType.cog: AssetType.raster_tile_set,
}
try:
parent_type = asset_dependencies[asset_type]
Expand Down
1 change: 0 additions & 1 deletion app/routes/datasets/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from app.settings.globals import API_URL

from ...authentication.token import is_admin
from ...crud import assets
from ...errors import RecordAlreadyExistsError
from ...models.orm.assets import Asset as ORMAsset
Expand Down
7 changes: 3 additions & 4 deletions app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from ..application import ContextEngine
from ..crud import assets as crud_assets
from ..crud import tasks as crud_tasks
from ..models.orm.assets import Asset as ORMAsset
from ..models.orm.tasks import Task as ORMTask
from ..models.orm.tasks import Task as ORMTask, Task
from ..models.pydantic.change_log import ChangeLog
from ..settings.globals import (
API_URL,
Expand Down Expand Up @@ -59,14 +58,14 @@ async def update_asset_status(asset_id, status):

def callback_constructor(
asset_id: UUID,
) -> Callback:
) -> Callable[[UUID, ChangeLog], Coroutine[UUID, ChangeLog, Task]]:
"""Callback constructor.

Assign asset_id in the context of the constructor once. Afterwards
you will only need to pass the ChangeLog object.
"""

async def callback(task_id: UUID, change_log: ChangeLog) -> ORMAsset:
async def callback(task_id: UUID, change_log: ChangeLog) -> ORMTask:
async with ContextEngine("WRITE"):
task: ORMTask = await crud_tasks.create_task(
task_id, asset_id=asset_id, change_log=[change_log.dict(by_alias=True)]
Expand Down
2 changes: 2 additions & 0 deletions app/tasks/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ..models.enum.sources import SourceType
from ..models.pydantic.assets import AssetType
from ..models.pydantic.change_log import ChangeLog
from .cog_assets import cog_asset
from .dynamic_vector_tile_cache_assets import dynamic_vector_tile_cache_asset
from .raster_tile_cache_assets import raster_tile_cache_asset
from .raster_tile_set_assets import raster_tile_set_asset
Expand All @@ -30,6 +31,7 @@
AssetType.grid_1x1: static_vector_1x1_asset,
# AssetType.vector_tile_cache: vector_tile_cache_asset,
AssetType.raster_tile_cache: raster_tile_cache_asset,
AssetType.cog: cog_asset,
# AssetType.dynamic_raster_tile_cache: dynamic_raster_tile_cache_asset,
AssetType.raster_tile_set: raster_tile_set_asset,
}.items()
Expand Down
104 changes: 104 additions & 0 deletions app/tasks/cog_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import Any, Callable, Coroutine, Dict
from uuid import UUID

from app.crud.assets import get_asset
from app.models.enum.assets import AssetType
from app.models.enum.pixetl import ResamplingMethod
from app.models.orm.assets import Asset as ORMAsset
from app.models.orm.tasks import Task
from app.models.pydantic.change_log import ChangeLog
from app.models.pydantic.creation_options import COGCreationOptions
from app.models.pydantic.jobs import GDALCOGJob, Job
from app.tasks import callback_constructor
from app.tasks.batch import execute
from app.tasks.raster_tile_set_assets.utils import JOB_ENV
from app.tasks.utils import sanitize_batch_job_name
from app.utils.path import get_asset_uri, infer_srid_from_grid


async def cog_asset(
dataset: str,
version: str,
asset_id: UUID,
input_data: Dict[str, Any],
) -> ChangeLog:
"""Create a COG asset from a raster tile set asset."""

# Create the Batch job to generate the COG
creation_options: COGCreationOptions = COGCreationOptions(
**input_data["creation_options"]
)

cog_job: Job = await create_cogify_job(
dataset,
version,
creation_options,
callback_constructor(asset_id),
)

log: ChangeLog = await execute([cog_job])
return log


async def create_cogify_job(
dataset: str,
version: str,
creation_options: COGCreationOptions,
callback: Callable[[UUID, ChangeLog], Coroutine[UUID, ChangeLog, Task]],
) -> Job:
"""Create a Batch job to coalesce a raster tile set into a COG.

For the moment only suitable for EPSG:3857 raster tile sets.
"""
source_asset: ORMAsset = await get_asset(UUID(creation_options.source_asset_id))

srid = infer_srid_from_grid(source_asset.creation_options["grid"])
asset_uri = get_asset_uri(
dataset, version, AssetType.raster_tile_set, source_asset.creation_options, srid
)

# get folder of tiles
source_uri = "/".join(asset_uri.split("/")[:-1]) + "/"

# We want to wind up with "{dataset}/{version}/raster/{projection}/cog/{implementation}.tif"
target_asset_uri = get_asset_uri(
dataset,
version,
AssetType.cog,
creation_options.dict(by_alias=True),
srid,
)

# The GDAL utilities use "near" whereas rasterio/pixetl use "nearest"
resample_method = (
"near"
if creation_options.resampling == ResamplingMethod.nearest
else creation_options.resampling.value
)

command = [
"cogify.sh",
"-s",
source_uri,
"-T",
target_asset_uri,
"-r",
resample_method,
"--block_size",
creation_options.block_size.value,
]

job_name: str = sanitize_batch_job_name(
f"COGify_{dataset}_{version}_{creation_options.implementation}"
)

kwargs = dict()

return GDALCOGJob(
dataset=dataset,
job_name=job_name,
command=command,
environment=JOB_ENV,
callback=callback,
**kwargs,
)
1 change: 1 addition & 0 deletions app/utils/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_asset_uri(
uri_constructor: Dict[str, str] = {
AssetType.dynamic_vector_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/dynamic/{{z}}/{{x}}/{{y}}.pbf",
AssetType.static_vector_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/{implementation}/{{z}}/{{x}}/{{y}}.pbf",
AssetType.cog: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/raster/{srid}/cog/{implementation}.tif",
AssetType.raster_tile_cache: f"{TILE_CACHE_URL}/{dataset}/{version}/{implementation}/{{z}}/{{x}}/{{y}}.png",
AssetType.shapefile: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/vector/{srid}/{dataset}_{version}.shp.zip",
AssetType.ndjson: f"s3://{DATA_LAKE_BUCKET}/{dataset}/{version}/vector/{srid}/{dataset}_{version}.ndjson",
Expand Down
Loading
Loading