From e11ebd357e1d2c48113585ff674f7ed7dcf936f6 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 27 Aug 2024 15:58:29 +0200 Subject: [PATCH] Issue #7 implement parquet on S3 for track_metrics - also unify approach with `upload_assets` --- .github/workflows/benchmarks.yaml | 7 +- .../pytest_track_metrics.py | 89 ++++++++++++++----- .../pytest_upload_assets.py | 36 ++++---- qa/unittests/tests/conftest.py | 35 ++++++++ .../tests/test_pytest_track_metrics.py | 55 +++++++----- .../tests/test_pytest_upload_assets.py | 47 ++-------- 6 files changed, 167 insertions(+), 102 deletions(-) diff --git a/.github/workflows/benchmarks.yaml b/.github/workflows/benchmarks.yaml index 349fa81..51c699f 100644 --- a/.github/workflows/benchmarks.yaml +++ b/.github/workflows/benchmarks.yaml @@ -31,13 +31,14 @@ jobs: --track-metrics-json=report/metrics.json \ --basetemp=tmp_path_root \ --upload-assets-run-id="gh-$GITHUB_RUN_ID" \ - --upload-assets-endpoint-url="https://s3.waw3-1.cloudferro.com" \ --upload-assets-bucket="APEx-benchmarks" env: OPENEO_AUTH_METHOD: client_credentials OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED: ${{ secrets.OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED }} - UPLOAD_ASSETS_ACCESS_KEY_ID: ${{ secrets.UPLOAD_ASSETS_ACCESS_KEY_ID }} - UPLOAD_ASSETS_SECRET_ACCESS_KEY: ${{ secrets.UPLOAD_ASSETS_SECRET_ACCESS_KEY }} + APEX_ALGORITHMS_S3_ACCESS_KEY_ID: ${{ secrets.APEX_ALGORITHMS_S3_ACCESS_KEY_ID }} + APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY: ${{ secrets.APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY }} + APEX_ALGORITHMS_S3_ENDPOINT_URL: "https://s3.waw3-1.cloudferro.com" + APEX_ALGORITHMS_S3_DEFAULT_REGION: "waw3-1" - name: List local reports if: always() run: ls -alR qa/benchmarks/report diff --git a/qa/tools/apex_algorithm_qa_tools/pytest_track_metrics.py b/qa/tools/apex_algorithm_qa_tools/pytest_track_metrics.py index 2b7ad4c..8cb18b3 100644 --- a/qa/tools/apex_algorithm_qa_tools/pytest_track_metrics.py +++ b/qa/tools/apex_algorithm_qa_tools/pytest_track_metrics.py @@ -20,22 +20,35 @@ def test_dummy(track_metric): track_metric("x squared", x*x) ... -- Run the tests with `--track-metrics-json=path/to/metrics.json` - to store metrics in a JSON file +- Run the tests with desired configuration through CLI options and env vars: + - CLI option to set JSON file to store the metrics to: + `--track-metrics-json=path/to/metrics.json` + - S3 credentials with env vars `APEX_ALGORITHMS_S3_ACCESS_KEY_ID` + and `APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY` + (Note that the classic `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` + are also supported as fallback) + - S3 endpoint URL with env var `APEX_ALGORITHMS_S3_ENDPOINT_URL` + (Note that the classic `AWS_ENDPOINT_URL` is also supported as fallback). """ +import dataclasses import json +import os import warnings from pathlib import Path -from typing import Any, Callable, List, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union import pyarrow +import pyarrow.fs import pyarrow.parquet import pytest _TRACK_METRICS_PLUGIN_NAME = "track_metrics" +_S3_KEY_DEFAULT = "metrics/v0/metrics.parquet" + + def pytest_addoption(parser: pytest.Parser): parser.addoption( "--track-metrics-json", @@ -43,10 +56,15 @@ def pytest_addoption(parser: pytest.Parser): help="Path to JSON file to store test/benchmark metrics.", ) parser.addoption( - "--track-metrics-parquet", - metavar="PATH", - # TODO: allow "s3://" URLs too? - help="Path to JSON file to store test/benchmark metrics.", + "--track-metrics-parquet-s3-bucket", + metavar="BUCKET", + help="S3 bucket to use for Parquet storage of metrics.", + ) + parser.addoption( + "--track-metrics-parquet-s3-key", + metavar="KEY", + default=_S3_KEY_DEFAULT, + help="S3 key to use for Parquet storage of metrics.", ) @@ -55,26 +73,43 @@ def pytest_configure(config): warnings.warn("`track_metrics` plugin is not supported on xdist worker nodes.") return - track_metrics_json = config.getoption("track_metrics_json") - track_metrics_parquet = config.getoption("track_metrics_parquet") - if track_metrics_json or track_metrics_parquet: + track_metrics_json = config.getoption("--track-metrics-json") + + track_metrics_parquet_s3_bucket = config.getoption( + "--track-metrics-parquet-s3-bucket" + ) + track_metrics_parquet_s3_key = config.getoption( + "--track-metrics-parquet-s3-key", _S3_KEY_DEFAULT + ) + + if track_metrics_json or track_metrics_parquet_s3_bucket: config.pluginmanager.register( TrackMetricsReporter( - json_path=track_metrics_json, parquet_path=track_metrics_parquet + json_path=track_metrics_json, + parquet_s3=_ParquetS3StorageSettings( + bucket=track_metrics_parquet_s3_bucket, + key=track_metrics_parquet_s3_key, + ), ), name=_TRACK_METRICS_PLUGIN_NAME, ) +@dataclasses.dataclass(frozen=True) +class _ParquetS3StorageSettings: + bucket: str + key: str = _S3_KEY_DEFAULT + + class TrackMetricsReporter: def __init__( self, json_path: Union[None, str, Path] = None, - parquet_path: Union[None, str, Path] = None, + parquet_s3: Optional[_ParquetS3StorageSettings] = None, user_properties_key: str = "track_metrics", ): self._json_path = Path(json_path) if json_path else None - self._parquet_path = parquet_path + self._parquet_s3 = parquet_s3 self._suite_metrics: List[dict] = [] self._user_properties_key = user_properties_key @@ -97,14 +132,15 @@ def pytest_sessionfinish(self, session): if self._json_path: self._write_json_report(self._json_path) - if self._parquet_path: - self._write_parquet_report(self._parquet_path) + if self._parquet_s3: + # TODO: also support local parquet file (without partitioning), e.g. for test/debug reasons + self._write_parquet_report(self._parquet_s3) def _write_json_report(self, path: Union[str, Path]): with Path(path).open("w", encoding="utf8") as f: json.dump(self._suite_metrics, f, indent=2) - def _write_parquet_report(self, path: Union[str, Path]): + def _write_parquet_report(self, parquet_s3: _ParquetS3StorageSettings): # Compile all (free-form) metrics into a more rigid table columns = set() suite_metrics = [] @@ -127,19 +163,28 @@ def _write_parquet_report(self, path: Union[str, Path]): {col: [m.get(col) for m in suite_metrics] for col in columns} ) - # TODO: add support for partitioning (date and nodeid) - # TODO: support for S3 with custom credential env vars - pyarrow.parquet.write_table(table, self._parquet_path) + fs = pyarrow.fs.S3FileSystem( + access_key=os.environ.get("APEX_ALGORITHMS_S3_ACCESS_KEY_ID"), + secret_key=os.environ.get("APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY"), + endpoint_override=os.environ.get("APEX_ALGORITHMS_S3_ENDPOINT_URL"), + ) + root_path = f"{parquet_s3.bucket}/{parquet_s3.key}" + pyarrow.parquet.write_to_dataset( + table=table, + root_path=root_path, + filesystem=fs, + # TODO: add support for partitioning (date and nodeid) + ) def pytest_report_header(self): - return f"Plugin `track_metrics` is active, reporting to json={self._json_path}, parquet={self._parquet_path}" + return f"Plugin `track_metrics` is active, reporting to json={self._json_path}, parquet={self._parquet_s3}" def pytest_terminal_summary(self, terminalreporter): reports = [] if self._json_path: reports.append(str(self._json_path)) - if self._parquet_path: - reports.append(str(self._parquet_path)) + if self._parquet_s3: + reports.append(str(self._parquet_s3)) if reports: terminalreporter.write_sep( "-", f"Generated track_metrics report: {', '.join(reports)}" diff --git a/qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py b/qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py index 65e9517..bdb7971 100644 --- a/qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py +++ b/qa/tools/apex_algorithm_qa_tools/pytest_upload_assets.py @@ -21,11 +21,15 @@ def test_dummy(upload_assets_on_fail, tmp_path): upload_assets_on_fail(path) ``` -- Run the tests with: - - `--upload-assets-run-id=RUNID` (optional, defaults to random UUID) - - `--upload-assets-endpoint-url=URL` - - `--upload-assets-bucket=BUCKET` - - and env vars `UPLOAD_ASSETS_ACCESS_KEY_ID` and `UPLOAD_ASSETS_SECRET_ACCESS_KEY` set. +- Run the tests with with desired configuration through CLI options and env vars: + - CLI option to set S3 bucket: `--upload-assets-bucket={BUCKET}` + - S3 credentials with env vars `APEX_ALGORITHMS_S3_ACCESS_KEY_ID` + and `APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY` + (Note that the classic `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` + are also supported as fallback) + - S3 endpoint URL with env var `APEX_ALGORITHMS_S3_ENDPOINT_URL` + (Note that the classic `AWS_ENDPOINT_URL` is also supported as fallback). + - CLI option `--upload-assets-run-id=RUNID` (optional, defaults to random UUID) """ import collections @@ -47,16 +51,13 @@ def test_dummy(upload_assets_on_fail, tmp_path): def pytest_addoption(parser: pytest.Parser): # TODO #22: option to always upload (also on success). + # TODO: handle run id at other level, so that it can be used transparently in track_metrics plugin too parser.addoption( "--upload-assets-run-id", metavar="RUNID", help="The run ID to use for building the S3 key.", ) - parser.addoption( - "--upload-assets-endpoint-url", - metavar="URL", - help="The S3 endpoint URL to upload to.", - ) + # TODO: include "s3" in the option name? parser.addoption( "--upload-assets-bucket", metavar="BUCKET", @@ -65,15 +66,16 @@ def pytest_addoption(parser: pytest.Parser): def pytest_configure(config: pytest.Config): - run_id = config.getoption("upload_assets_run_id") - endpoint_url = config.getoption("upload_assets_endpoint_url") - bucket = config.getoption("upload_assets_bucket") - if endpoint_url and bucket: + run_id = config.getoption("--upload-assets-run-id") + bucket = config.getoption("--upload-assets-bucket") + if bucket: s3_client = boto3.client( service_name="s3", - aws_access_key_id=os.environ.get("UPLOAD_ASSETS_ACCESS_KEY_ID"), - aws_secret_access_key=os.environ.get("UPLOAD_ASSETS_SECRET_ACCESS_KEY"), - endpoint_url=endpoint_url, + aws_access_key_id=os.environ.get("APEX_ALGORITHMS_S3_ACCESS_KEY_ID"), + aws_secret_access_key=os.environ.get( + "APEX_ALGORITHMS_S3_SECRET_ACCESS_KEY" + ), + endpoint_url=os.environ.get("APEX_ALGORITHMS_S3_ENDPOINT_URL"), ) config.pluginmanager.register( S3UploadPlugin(run_id=run_id, s3_client=s3_client, bucket=bucket), diff --git a/qa/unittests/tests/conftest.py b/qa/unittests/tests/conftest.py index 6b74f1e..7ebfea8 100644 --- a/qa/unittests/tests/conftest.py +++ b/qa/unittests/tests/conftest.py @@ -1,3 +1,7 @@ +import uuid + +import boto3 +import moto.server import pytest pytest_plugins = [ @@ -5,3 +9,34 @@ ] pytest.register_assert_rewrite("apex_algorithm_qa_tools.scenarios") + + +@pytest.fixture(scope="module") +def moto_server() -> str: + """Fixture to run a mocked AWS server for testing.""" + # Note: pass `port=0` to get a random free port. + # TODO avoid the private `_server` attribute https://github.com/getmoto/moto/issues/7894 + server = moto.server.ThreadedMotoServer(port=0) + server.start() + host, port = server._server.server_address + yield f"http://{host}:{port}" + server.stop() + + +@pytest.fixture(autouse=True) +def aws_credentials(monkeypatch): + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test123") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test456") + + +@pytest.fixture +def s3_client(moto_server): + return boto3.client("s3", endpoint_url=moto_server) + + +@pytest.fixture +def s3_bucket(s3_client) -> str: + # Unique bucket name for test isolation + bucket = f"test-bucket-{uuid.uuid4().hex}" + s3_client.create_bucket(Bucket=bucket) + return bucket diff --git a/qa/unittests/tests/test_pytest_track_metrics.py b/qa/unittests/tests/test_pytest_track_metrics.py index aa49eec..7631d78 100644 --- a/qa/unittests/tests/test_pytest_track_metrics.py +++ b/qa/unittests/tests/test_pytest_track_metrics.py @@ -6,7 +6,7 @@ import pytest -def test_track_metric_basics_json(pytester: pytest.Pytester, tmp_path): +def test_track_metric_json(pytester: pytest.Pytester, tmp_path): pytester.makeconftest( """ pytest_plugins = [ @@ -26,13 +26,15 @@ def test_3plus(track_metric, x): ) metrics_path = tmp_path / "metrics.json" + + start_time = time.time() run_result = pytester.runpytest( f"--track-metrics-json={metrics_path}", ) + run_result.stdout.re_match_lines( [r"Plugin `track_metrics` is active, reporting to"] ) - run_result.assert_outcomes(passed=1, failed=1) assert metrics_path.exists() @@ -40,15 +42,14 @@ def test_3plus(track_metric, x): with metrics_path.open("r", encoding="utf8") as f: metrics = json.load(f) - approx_now = pytest.approx(time.time(), abs=1) assert metrics == [ { "nodeid": "test_addition.py::test_3plus[5]", "report": { "outcome": "passed", "duration": pytest.approx(0, abs=1), - "start": approx_now, - "stop": approx_now, + "start": pytest.approx(start_time, abs=1), + "stop": pytest.approx(start_time, abs=1), }, "metrics": [ ["x squared", 25], @@ -59,8 +60,8 @@ def test_3plus(track_metric, x): "report": { "outcome": "failed", "duration": pytest.approx(0, abs=1), - "start": approx_now, - "stop": approx_now, + "start": pytest.approx(start_time, abs=1), + "stop": pytest.approx(start_time, abs=1), }, "metrics": [ ["x squared", 36], @@ -69,7 +70,9 @@ def test_3plus(track_metric, x): ] -def test_track_metric_basics_parquet(pytester: pytest.Pytester, tmp_path): +def test_track_metric_parquet_s3( + pytester: pytest.Pytester, moto_server, s3_client, s3_bucket, monkeypatch +): pytester.makeconftest( """ pytest_plugins = [ @@ -88,20 +91,33 @@ def test_3plus(track_metric, x): """ ) - metrics_path = tmp_path / "metrics.parquet" + monkeypatch.setenv("APEX_ALGORITHMS_S3_ENDPOINT_URL", moto_server) + s3_key = "metrics-v0.parquet" + + start_time = time.time() run_result = pytester.runpytest( - f"--track-metrics-parquet={metrics_path}", + f"--track-metrics-parquet-s3-bucket={s3_bucket}", + f"--track-metrics-parquet-s3-key={s3_key}", ) + run_result.stdout.re_match_lines( [r"Plugin `track_metrics` is active, reporting to"] ) - run_result.assert_outcomes(passed=1, failed=1) - assert metrics_path.exists() - run_result.stdout.re_match_lines([f".*Generated.*{re.escape(str(metrics_path))}.*"]) + # Check for written Parquet files on S3 + object_listing = s3_client.list_objects(Bucket=s3_bucket) + assert len(object_listing["Contents"]) + keys = [obj["Key"] for obj in object_listing["Contents"]] + assert f"{s3_key}/" in keys + + # Load the Parquet file from S3 + table = pyarrow.parquet.read_table(f"s3://{s3_bucket}/{s3_key}") + + run_result.stdout.re_match_lines( + [f".*Generated.*{re.escape(str(s3_bucket))}.*{re.escape(str(s3_key))}.*"] + ) - table = pyarrow.parquet.read_table(metrics_path) df = table.to_pandas() assert set(df.columns) == { "nodeid", @@ -112,20 +128,17 @@ def test_3plus(track_metric, x): "x squared", } df = df.set_index("nodeid") - - approx_now = pytest.approx(time.time(), abs=1) - assert df.loc["test_addition.py::test_3plus[5]"].to_dict() == { "outcome": "passed", "duration": pytest.approx(0, abs=1), - "start": approx_now, - "stop": approx_now, + "start": pytest.approx(start_time, abs=1), + "stop": pytest.approx(start_time, abs=1), "x squared": 25, } assert df.loc["test_addition.py::test_3plus[6]"].to_dict() == { "outcome": "failed", "duration": pytest.approx(0, abs=1), - "start": approx_now, - "stop": approx_now, + "start": pytest.approx(start_time, abs=1), + "stop": pytest.approx(start_time, abs=1), "x squared": 36, } diff --git a/qa/unittests/tests/test_pytest_upload_assets.py b/qa/unittests/tests/test_pytest_upload_assets.py index 348f690..b604a0c 100644 --- a/qa/unittests/tests/test_pytest_upload_assets.py +++ b/qa/unittests/tests/test_pytest_upload_assets.py @@ -1,43 +1,8 @@ -import uuid - -import boto3 -import moto.server import pytest -@pytest.fixture(scope="module") -def moto_server() -> str: - """Fixture to run a mocked AWS server for testing.""" - # Note: pass `port=0` to get a random free port. - # TODO avoid the private `_server` attribute https://github.com/getmoto/moto/issues/7894 - server = moto.server.ThreadedMotoServer(port=0) - server.start() - host, port = server._server.server_address - yield f"http://{host}:{port}" - server.stop() - - -@pytest.fixture(autouse=True) -def aws_credentials(monkeypatch): - monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test123") - monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test456") - - -@pytest.fixture -def s3_client(moto_server): - return boto3.client("s3", endpoint_url=moto_server) - - -@pytest.fixture -def s3_bucket(s3_client) -> str: - # Unique bucket name for test isolation - bucket = f"test-bucket-{uuid.uuid4().hex}" - s3_client.create_bucket(Bucket=bucket) - return bucket - - def test_basic_upload_on_fail( - pytester: pytest.Pytester, moto_server, s3_client, s3_bucket + pytester: pytest.Pytester, moto_server, s3_client, s3_bucket, monkeypatch ): pytester.makeconftest( """ @@ -56,9 +21,10 @@ def test_fail_and_upload(upload_assets_on_fail, tmp_path): """ ) + monkeypatch.setenv("APEX_ALGORITHMS_S3_ENDPOINT_URL", moto_server) + run_result = pytester.runpytest_subprocess( "--upload-assets-run-id=test-run-123", - f"--upload-assets-endpoint-url={moto_server}", f"--upload-assets-bucket={s3_bucket}", ) run_result.stdout.re_match_lines( @@ -83,7 +49,9 @@ def test_fail_and_upload(upload_assets_on_fail, tmp_path): ) -def test_nop_on_success(pytester: pytest.Pytester, moto_server, s3_client, s3_bucket): +def test_nop_on_success( + pytester: pytest.Pytester, moto_server, s3_client, s3_bucket, monkeypatch +): pytester.makeconftest( """ pytest_plugins = [ @@ -101,9 +69,10 @@ def test_success(upload_assets_on_fail, tmp_path): """ ) + monkeypatch.setenv("APEX_ALGORITHMS_S3_ENDPOINT_URL", moto_server) + run_result = pytester.runpytest_subprocess( "--upload-assets-run-id=test-run-123", - f"--upload-assets-endpoint-url={moto_server}", f"--upload-assets-bucket={s3_bucket}", ) run_result.stdout.re_match_lines(