Skip to content

Commit

Permalink
Issue #7 implement parquet on S3 for track_metrics
Browse files Browse the repository at this point in the history
- also unify approach with `upload_assets`
  • Loading branch information
soxofaan committed Aug 27, 2024
1 parent 786e5dc commit e11ebd3
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 102 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ jobs:
--track-metrics-json=report/metrics.json \
--basetemp=tmp_path_root \
--upload-assets-run-id="gh-$GITHUB_RUN_ID" \
--upload-assets-endpoint-url="https://s3.waw3-1.cloudferro.com" \
--upload-assets-bucket="APEx-benchmarks"
env:
OPENEO_AUTH_METHOD: client_credentials
OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED: ${{ secrets.OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED }}
UPLOAD_ASSETS_ACCESS_KEY_ID: ${{ secrets.UPLOAD_ASSETS_ACCESS_KEY_ID }}
UPLOAD_ASSETS_SECRET_ACCESS_KEY: ${{ secrets.UPLOAD_ASSETS_SECRET_ACCESS_KEY }}
APEX_ALGORITHMS_S3_ACCESS_KEY_ID: ${{ secrets.APEX_ALGORITHMS_S3_ACCESS_KEY_ID }}
APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY: ${{ secrets.APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY }}
APEX_ALGORITHMS_S3_ENDPOINT_URL: "https://s3.waw3-1.cloudferro.com"
APEX_ALGORITHMS_S3_DEFAULT_REGION: "waw3-1"
- name: List local reports
if: always()
run: ls -alR qa/benchmarks/report
Expand Down
89 changes: 67 additions & 22 deletions qa/tools/apex_algorithm_qa_tools/pytest_track_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,51 @@ def test_dummy(track_metric):
track_metric("x squared", x*x)
...
- Run the tests with `--track-metrics-json=path/to/metrics.json`
to store metrics in a JSON file
- Run the tests with desired configuration through CLI options and env vars:
- CLI option to set JSON file to store the metrics to:
`--track-metrics-json=path/to/metrics.json`
- S3 credentials with env vars `APEX_ALGORITHMS_S3_ACCESS_KEY_ID`
and `APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY`
(Note that the classic `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
are also supported as fallback)
- S3 endpoint URL with env var `APEX_ALGORITHMS_S3_ENDPOINT_URL`
(Note that the classic `AWS_ENDPOINT_URL` is also supported as fallback).
"""

import dataclasses
import json
import os
import warnings
from pathlib import Path
from typing import Any, Callable, List, Tuple, Union
from typing import Any, Callable, List, Optional, Tuple, Union

import pyarrow
import pyarrow.fs
import pyarrow.parquet
import pytest

_TRACK_METRICS_PLUGIN_NAME = "track_metrics"


_S3_KEY_DEFAULT = "metrics/v0/metrics.parquet"


def pytest_addoption(parser: pytest.Parser):
parser.addoption(
"--track-metrics-json",
metavar="PATH",
help="Path to JSON file to store test/benchmark metrics.",
)
parser.addoption(
"--track-metrics-parquet",
metavar="PATH",
# TODO: allow "s3://" URLs too?
help="Path to JSON file to store test/benchmark metrics.",
"--track-metrics-parquet-s3-bucket",
metavar="BUCKET",
help="S3 bucket to use for Parquet storage of metrics.",
)
parser.addoption(
"--track-metrics-parquet-s3-key",
metavar="KEY",
default=_S3_KEY_DEFAULT,
help="S3 key to use for Parquet storage of metrics.",
)


Expand All @@ -55,26 +73,43 @@ def pytest_configure(config):
warnings.warn("`track_metrics` plugin is not supported on xdist worker nodes.")
return

track_metrics_json = config.getoption("track_metrics_json")
track_metrics_parquet = config.getoption("track_metrics_parquet")
if track_metrics_json or track_metrics_parquet:
track_metrics_json = config.getoption("--track-metrics-json")

track_metrics_parquet_s3_bucket = config.getoption(
"--track-metrics-parquet-s3-bucket"
)
track_metrics_parquet_s3_key = config.getoption(
"--track-metrics-parquet-s3-key", _S3_KEY_DEFAULT
)

if track_metrics_json or track_metrics_parquet_s3_bucket:
config.pluginmanager.register(
TrackMetricsReporter(
json_path=track_metrics_json, parquet_path=track_metrics_parquet
json_path=track_metrics_json,
parquet_s3=_ParquetS3StorageSettings(
bucket=track_metrics_parquet_s3_bucket,
key=track_metrics_parquet_s3_key,
),
),
name=_TRACK_METRICS_PLUGIN_NAME,
)


@dataclasses.dataclass(frozen=True)
class _ParquetS3StorageSettings:
bucket: str
key: str = _S3_KEY_DEFAULT


class TrackMetricsReporter:
def __init__(
self,
json_path: Union[None, str, Path] = None,
parquet_path: Union[None, str, Path] = None,
parquet_s3: Optional[_ParquetS3StorageSettings] = None,
user_properties_key: str = "track_metrics",
):
self._json_path = Path(json_path) if json_path else None
self._parquet_path = parquet_path
self._parquet_s3 = parquet_s3
self._suite_metrics: List[dict] = []
self._user_properties_key = user_properties_key

Expand All @@ -97,14 +132,15 @@ def pytest_sessionfinish(self, session):
if self._json_path:
self._write_json_report(self._json_path)

if self._parquet_path:
self._write_parquet_report(self._parquet_path)
if self._parquet_s3:
# TODO: also support local parquet file (without partitioning), e.g. for test/debug reasons
self._write_parquet_report(self._parquet_s3)

def _write_json_report(self, path: Union[str, Path]):
with Path(path).open("w", encoding="utf8") as f:
json.dump(self._suite_metrics, f, indent=2)

def _write_parquet_report(self, path: Union[str, Path]):
def _write_parquet_report(self, parquet_s3: _ParquetS3StorageSettings):
# Compile all (free-form) metrics into a more rigid table
columns = set()
suite_metrics = []
Expand All @@ -127,19 +163,28 @@ def _write_parquet_report(self, path: Union[str, Path]):
{col: [m.get(col) for m in suite_metrics] for col in columns}
)

# TODO: add support for partitioning (date and nodeid)
# TODO: support for S3 with custom credential env vars
pyarrow.parquet.write_table(table, self._parquet_path)
fs = pyarrow.fs.S3FileSystem(
access_key=os.environ.get("APEX_ALGORITHMS_S3_ACCESS_KEY_ID"),
secret_key=os.environ.get("APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY"),
endpoint_override=os.environ.get("APEX_ALGORITHMS_S3_ENDPOINT_URL"),
)
root_path = f"{parquet_s3.bucket}/{parquet_s3.key}"
pyarrow.parquet.write_to_dataset(
table=table,
root_path=root_path,
filesystem=fs,
# TODO: add support for partitioning (date and nodeid)
)

def pytest_report_header(self):
return f"Plugin `track_metrics` is active, reporting to json={self._json_path}, parquet={self._parquet_path}"
return f"Plugin `track_metrics` is active, reporting to json={self._json_path}, parquet={self._parquet_s3}"

def pytest_terminal_summary(self, terminalreporter):
reports = []
if self._json_path:
reports.append(str(self._json_path))
if self._parquet_path:
reports.append(str(self._parquet_path))
if self._parquet_s3:
reports.append(str(self._parquet_s3))
if reports:
terminalreporter.write_sep(
"-", f"Generated track_metrics report: {', '.join(reports)}"
Expand Down
36 changes: 19 additions & 17 deletions qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ def test_dummy(upload_assets_on_fail, tmp_path):
upload_assets_on_fail(path)
```
- Run the tests with:
- `--upload-assets-run-id=RUNID` (optional, defaults to random UUID)
- `--upload-assets-endpoint-url=URL`
- `--upload-assets-bucket=BUCKET`
- and env vars `UPLOAD_ASSETS_ACCESS_KEY_ID` and `UPLOAD_ASSETS_SECRET_ACCESS_KEY` set.
- Run the tests with with desired configuration through CLI options and env vars:
- CLI option to set S3 bucket: `--upload-assets-bucket={BUCKET}`
- S3 credentials with env vars `APEX_ALGORITHMS_S3_ACCESS_KEY_ID`
and `APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY`
(Note that the classic `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
are also supported as fallback)
- S3 endpoint URL with env var `APEX_ALGORITHMS_S3_ENDPOINT_URL`
(Note that the classic `AWS_ENDPOINT_URL` is also supported as fallback).
- CLI option `--upload-assets-run-id=RUNID` (optional, defaults to random UUID)
"""

import collections
Expand All @@ -47,16 +51,13 @@ def test_dummy(upload_assets_on_fail, tmp_path):

def pytest_addoption(parser: pytest.Parser):
# TODO #22: option to always upload (also on success).
# TODO: handle run id at other level, so that it can be used transparently in track_metrics plugin too
parser.addoption(
"--upload-assets-run-id",
metavar="RUNID",
help="The run ID to use for building the S3 key.",
)
parser.addoption(
"--upload-assets-endpoint-url",
metavar="URL",
help="The S3 endpoint URL to upload to.",
)
# TODO: include "s3" in the option name?
parser.addoption(
"--upload-assets-bucket",
metavar="BUCKET",
Expand All @@ -65,15 +66,16 @@ def pytest_addoption(parser: pytest.Parser):


def pytest_configure(config: pytest.Config):
run_id = config.getoption("upload_assets_run_id")
endpoint_url = config.getoption("upload_assets_endpoint_url")
bucket = config.getoption("upload_assets_bucket")
if endpoint_url and bucket:
run_id = config.getoption("--upload-assets-run-id")
bucket = config.getoption("--upload-assets-bucket")
if bucket:
s3_client = boto3.client(
service_name="s3",
aws_access_key_id=os.environ.get("UPLOAD_ASSETS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("UPLOAD_ASSETS_SECRET_ACCESS_KEY"),
endpoint_url=endpoint_url,
aws_access_key_id=os.environ.get("APEX_ALGORITHMS_S3_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get(
"APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY"
),
endpoint_url=os.environ.get("APEX_ALGORITHMS_S3_ENDPOINT_URL"),
)
config.pluginmanager.register(
S3UploadPlugin(run_id=run_id, s3_client=s3_client, bucket=bucket),
Expand Down
35 changes: 35 additions & 0 deletions qa/unittests/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,42 @@
import uuid

import boto3
import moto.server
import pytest

pytest_plugins = [
"pytester",
]

pytest.register_assert_rewrite("apex_algorithm_qa_tools.scenarios")


@pytest.fixture(scope="module")
def moto_server() -> str:
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
# TODO avoid the private `_server` attribute https://github.com/getmoto/moto/issues/7894
server = moto.server.ThreadedMotoServer(port=0)
server.start()
host, port = server._server.server_address
yield f"http://{host}:{port}"
server.stop()


@pytest.fixture(autouse=True)
def aws_credentials(monkeypatch):
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test123")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test456")


@pytest.fixture
def s3_client(moto_server):
return boto3.client("s3", endpoint_url=moto_server)


@pytest.fixture
def s3_bucket(s3_client) -> str:
# Unique bucket name for test isolation
bucket = f"test-bucket-{uuid.uuid4().hex}"
s3_client.create_bucket(Bucket=bucket)
return bucket
Loading

0 comments on commit e11ebd3

Please sign in to comment.