Skip to content

Commit

Permalink
Load reference metadata for input and output files and store in prove…
Browse files Browse the repository at this point in the history
…nance
  • Loading branch information
maxnoe committed Jul 19, 2024
1 parent 371964e commit a8ff9e2
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/changes/2598.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The provenance system now records the reference metadata
of input and output files, if available.
30 changes: 28 additions & 2 deletions src/ctapipe/core/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import platform
import sys
import uuid
import warnings
from collections import UserList
from contextlib import contextmanager
from importlib import import_module
Expand Down Expand Up @@ -57,6 +58,10 @@ def get_module_version(name):
return "not installed"


class MissingReferenceMetadata(UserWarning):
"""Warning raised if reference metadata could not be read from input file."""


class Provenance(metaclass=Singleton):
"""
Manage the provenance info for a stack of *activities*
Expand Down Expand Up @@ -251,21 +256,30 @@ def register_input(self, url, role=None):
role: str
role name that this input satisfies
"""
self._prov["input"].append(dict(url=url, role=role))
reference_meta = self._get_reference_meta(url=url)
self._prov["input"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_output(self, url, role=None):
"""
Add a URL of a file to the list of outputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Should only be called once the file is finalized, so that reference metadata
can be read.
Parameters
----------
url: str
filename or url of output file
role: str
role name that this output satisfies
"""
self._prov["output"].append(dict(url=url, role=role))
reference_meta = self._get_reference_meta(url=url)
self._prov["output"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_config(self, config):
"""add a dictionary of configuration parameters to this activity"""
Expand Down Expand Up @@ -302,6 +316,18 @@ def sample_cpu_and_memory(self):
def provenance(self):
return self._prov

def _get_reference_meta(self, url):
# here to prevent circular imports / top-level cross-dependencies
from ..io.metadata import read_reference_metadata

try:
return read_reference_metadata(url).to_dict()
except Exception:
warnings.warn(
f"Could not read reference metadata for input file: {url}",
MissingReferenceMetadata,
)


def _get_python_packages():
def _sortkey(dist):
Expand Down
34 changes: 25 additions & 9 deletions src/ctapipe/core/tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ctapipe.core import Provenance
from ctapipe.core.provenance import _ActivityProvenance
from ctapipe.io.metadata import Reference


@pytest.fixture
Expand All @@ -15,19 +16,18 @@ def provenance(monkeypatch):
prov = Provenance()
monkeypatch.setattr(prov, "_activities", [])
monkeypatch.setattr(prov, "_finished_activities", [])

prov.start_activity("test1")
prov.add_input_file("input.txt")
prov.add_output_file("output.txt")
prov.start_activity("test2")
prov.add_input_file("input_a.txt")
prov.add_input_file("input_b.txt")
prov.finish_activity("test2")
prov.finish_activity("test1")
return prov


def test_provenance_activity_names(provenance):
provenance.start_activity("test1")
provenance.add_input_file("input.txt")
provenance.add_output_file("output.txt")
provenance.start_activity("test2")
provenance.add_input_file("input_a.txt")
provenance.add_input_file("input_b.txt")
provenance.finish_activity("test2")
provenance.finish_activity("test1")
assert set(provenance.finished_activity_names) == {"test2", "test1"}


Expand All @@ -52,6 +52,8 @@ def test_provenence_contextmanager():


def test_provenance_json(provenance: Provenance):
provenance.start_activity("test1")
provenance.finish_activity("test1")
data = json.loads(provenance.as_json())

activity = data[0]
Expand All @@ -60,3 +62,17 @@ def test_provenance_json(provenance: Provenance):
packages = activity["system"]["python"].get("packages")
assert isinstance(packages, list)
assert any(p["name"] == "numpy" for p in packages)


def test_provenance_input_reference_meta(provenance: Provenance, dl1_file):
provenance.start_activity("test1")
provenance.add_input_file(dl1_file, "events")
provenance.finish_activity("test1")
data = json.loads(provenance.as_json())

inputs = data[0]["input"]
assert len(inputs) == 1
input_meta = inputs[0]
assert "reference_meta" in input_meta
assert "CTA PRODUCT ID" in input_meta["reference_meta"]
Reference.from_dict(input_meta["reference_meta"])
2 changes: 1 addition & 1 deletion src/ctapipe/io/datawriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def finish(self):

self._write_context_metadata_headers()
self._writer.close()
PROV.add_output_file(str(self.output_path), role="DL1/Event")

@property
def datalevels(self):
Expand Down Expand Up @@ -432,7 +433,6 @@ def _setup_output_path(self):
", use the `overwrite` option or choose another `output_path` "
)
self.log.debug("output path: %s", self.output_path)
PROV.add_output_file(str(self.output_path), role="DL1/Event")

# check that options make sense
writable_things = [
Expand Down
4 changes: 2 additions & 2 deletions src/ctapipe/io/hdf5merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def __init__(self, output_path=None, **kwargs):
mode="a" if appending else "w",
filters=DEFAULT_FILTERS,
)
Provenance().add_output_file(str(self.output_path))

self.required_nodes = None
self.data_model_version = None
Expand Down Expand Up @@ -247,7 +246,7 @@ def _update_meta(self):

def _read_meta(self, h5file):
try:
return metadata.read_reference_metadata_hdf5(h5file)
return metadata._read_reference_metadata_hdf5(h5file)
except Exception:
raise CannotMerge(
f"CTA Reference meta not found in input file: {h5file.filename}"
Expand Down Expand Up @@ -384,6 +383,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def close(self):
if hasattr(self, "h5file"):
self.h5file.close()
Provenance().add_output_file(str(self.output_path))

def _append_subarray(self, other):
# focal length choice doesn't matter here, set to equivalent so we don't get
Expand Down
12 changes: 8 additions & 4 deletions src/ctapipe/tools/dump_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,15 @@ def write_camera_definitions(self):

try:
geom_table.write(geom_filename, **args)
readout_table.write(readout_filename, **args)
Provenance().add_output_file(geom_filename, "CameraGeometry")
except OSError as err:
self.log.exception("couldn't write camera geometry because: %s", err)

try:
readout_table.write(readout_filename, **args)
Provenance().add_output_file(readout_filename, "CameraReadout")
except OSError as err:
self.log.warning("couldn't write camera definition because: %s", err)
self.log.exception("couldn't write camera definition because: %s", err)

def write_optics_descriptions(self):
"""writes out optics files for each telescope type"""
Expand All @@ -109,7 +113,7 @@ def write_optics_descriptions(self):
tab.write(filename, **args)
Provenance().add_output_file(filename, "OpticsDescription")
except OSError as err:
self.log.warning(
self.log.exception(
"couldn't write optics description '%s' because: %s", filename, err
)

Expand All @@ -123,7 +127,7 @@ def write_subarray_description(self):
tab.write(filename, **args)
Provenance().add_output_file(filename, "SubarrayDescription")
except OSError as err:
self.log.warning(
self.log.exception(
"couldn't write subarray description '%s' because: %s", filename, err
)

Expand Down
3 changes: 1 addition & 2 deletions src/ctapipe/tools/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def setup(self):
)
sys.exit(1)

self.merger = HDF5Merger(parent=self)
self.merger = self.enter_context(HDF5Merger(parent=self))
if self.merger.output_path in self.input_files:
raise ToolConfigurationError(
"Output path contained in input files. Fix your configuration / cli arguments."
Expand Down Expand Up @@ -195,7 +195,6 @@ def finish(self):
current_activity = Provenance().current_activity.provenance
self.merger.meta.activity = meta.Activity.from_provenance(current_activity)
meta.write_to_hdf5(self.merger.meta.to_dict(), self.merger.h5file)
self.merger.close()


def main():
Expand Down

0 comments on commit a8ff9e2

Please sign in to comment.