Skip to content

Commit

Permalink
Sbachmei/mic 5476/add optional schema arg to config constructor (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier authored Dec 16, 2024
1 parent 9cbbec1 commit 27555c9
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 99 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
**0.1.2 - 12/16/24**

- Add optional arg to pass allowable schemas to the Config constructor

**0.1.1 - 12/10/24**

- Implement pipeline choice sections
Expand Down
12 changes: 7 additions & 5 deletions src/easylink/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Config(LayeredConfigTree):
def __init__(
self,
config_params: dict[str, Any],
potential_schemas: list[PipelineSchema] | PipelineSchema = PIPELINE_SCHEMAS,
):
super().__init__(layers=["initial_data", "default", "user_configured"])
self.update(DEFAULT_ENVIRONMENT, layer="default")
Expand All @@ -56,10 +57,11 @@ def __init__(
self.update({"pipeline": {"combined_implementations": {}}}, layer="default")
if self.environment.computing_environment == "slurm":
# Set slurm defaults to empty dict instead of None so that we don't get errors
# In slurm resources property
# in slurm_resources property
self.update({"environment": {"slurm": {}}}, layer="default")

self.update({"schema": self._get_schema()}, layer="initial_data")
if not isinstance(potential_schemas, list):
potential_schemas = [potential_schemas]
self.update({"schema": self._get_schema(potential_schemas)}, layer="initial_data")
self.schema.configure_pipeline(self.pipeline, self.input_data)
self._validate()
self.freeze()
Expand Down Expand Up @@ -122,7 +124,7 @@ def spark_resources(self) -> dict[str, Any]:
# Setup Methods #
#################

def _get_schema(self) -> PipelineSchema:
def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema:
"""Validates the requested pipeline against supported schemas.
Notes
Expand All @@ -134,7 +136,7 @@ def _get_schema(self) -> PipelineSchema:
"""
errors = defaultdict(dict)
# Try each schema until one is validated
for schema in PIPELINE_SCHEMAS:
for schema in potential_schemas:
logs = schema.validate_step(self.pipeline, self.input_data)
if logs:
errors[PIPELINE_ERRORS_KEY][schema.name] = logs
Expand Down
34 changes: 30 additions & 4 deletions src/easylink/implementation_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ step_1_python_pandas:
script_cmd: python /dummy_step.py
outputs:
step_1_main_output: result.parquet
step_1a_python_pandas:
steps:
- step_1a
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif
script_cmd: python /dummy_step.py
env:
INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS"
outputs:
step_1a_main_output: result.parquet
step_1b_python_pandas:
steps:
- step_1b
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif
script_cmd: python /dummy_step.py
env:
INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS"
outputs:
step_1b_main_output: result.parquet
step_2_python_pandas:
steps:
- step_2
Expand Down Expand Up @@ -59,17 +77,17 @@ step_4a_python_pandas:
step_4b_python_pandas:
steps:
- step_4b
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/r-image.sif
script_cmd: Rscript /dummy_step.R
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif
script_cmd: python /dummy_step.py
env:
INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS"
outputs:
step_4b_main_output: result.parquet
step_4b_r:
steps:
- step_4b
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif
script_cmd: python /dummy_step.py
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/r-image.sif
script_cmd: Rscript /dummy_step.R
env:
INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS"
outputs:
Expand Down Expand Up @@ -167,3 +185,11 @@ step_3_and_step_4_combined_python_pandas:
script_cmd: python /dummy_step.py
outputs:
step_4_main_output: result.parquet
step_1a_and_step_1b_combined_python_pandas:
steps:
- step_1a
- step_1b
image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif
script_cmd: python /dummy_step.py
outputs:
step_1_main_output: result.parquet
36 changes: 24 additions & 12 deletions src/easylink/pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def merge_combined_implementations(self, config):
metadata_steps = implementation_metadata[combined_implementation_config["name"]][
"steps"
]
self.validate_implementation_topology(nodes_to_merge, metadata_steps)
self.validate_combined_implementation_topology(nodes_to_merge, metadata_steps)

(
combined_input_slots,
Expand Down Expand Up @@ -309,11 +309,17 @@ def spark_is_required(self) -> bool:
"""Check if the pipeline requires spark resources."""
return any([implementation.requires_spark for implementation in self.implementations])

def validate_implementation_topology(
def validate_combined_implementation_topology(
self, nodes: list[str], metadata_steps: list[str]
) -> None:
"""Check that the subgraph induced by the nodes implemented by this implementation
is topologically consistent with the list of metadata steps."""
"""Validates the combined implementation topology against intended implementation.
Check that the subgraph induced by the nodes implemented by this implementation
is topologically consistent with the list of steps intended to be implemented.
"""
# HACK: We cannot just call self.subgraph(nodes) because networkx will
# try and instantiate another PipelineGraph which requires a Config object
# to be passed to the constructor.
subgraph = ImplementationGraph(self).subgraph(nodes)

# Relabel nodes by schema step
Expand All @@ -322,16 +328,22 @@ def validate_implementation_topology(
for node, data in subgraph.nodes(data=True)
}
if not set(mapping.values()) == set(metadata_steps):
# NOTE: It's possible that we've combined nodes in such a way that removed
# an edge from the graph and so nx is unable to reliably sort the subgraph.
full_pipeline_sorted_nodes = list(nx.topological_sort(self))
sorted_nodes = [node for node in full_pipeline_sorted_nodes if node in mapping]
raise ValueError(
f"Pipeline configuration nodes {list(mapping.values())} do not match metadata steps {metadata_steps}."
f"Pipeline configuration nodes {[mapping[node] for node in sorted_nodes]} do not match metadata steps {metadata_steps}."
)
subgraph = nx.relabel_nodes(subgraph, mapping)
# Check for topological inconsistency, i.e. if there
# is a path from a later node to an earlier node.
for i in range(len(metadata_steps)):
for j in range(i + 1, len(metadata_steps)):
if nx.has_path(subgraph, metadata_steps[j], metadata_steps[i]):
# Check for topological inconsistency, i.e. if there is a path from a later node to an earlier node.
for predecessor in range(len(metadata_steps)):
for successor in range(predecessor + 1, len(metadata_steps)):
if nx.has_path(
subgraph, metadata_steps[successor], metadata_steps[predecessor]
):
raise ValueError(
f"Pipeline configuration nodes {set(subgraph.nodes())} are not topologically consistent with metadata steps {set(metadata_steps)}:"
f"There is a path from successor {metadata_steps[j]} to predecessor {metadata_steps[i]}"
f"Pipeline configuration nodes {list(nx.topological_sort(subgraph))} are not topologically consistent with "
f"the intended implementations for {list(metadata_steps)}:\n"
f"There is a path from successor {metadata_steps[successor]} to predecessor {metadata_steps[predecessor]}."
)
6 changes: 4 additions & 2 deletions src/easylink/pipeline_schema_constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from easylink.pipeline_schema_constants import development, integration_test
from easylink.pipeline_schema_constants import development, tests

ALLOWED_SCHEMA_PARAMS = {
"development": development.SCHEMA_PARAMS,
}

TESTING_SCHEMA_PARAMS = {
"integration": integration_test.SCHEMA_PARAMS,
"integration": tests.SINGLE_STEP_SCHEMA_PARAMS,
"combined_bad_topology": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
"combined_bad_implementation_names": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
}
39 changes: 0 additions & 39 deletions src/easylink/pipeline_schema_constants/integration_test.py

This file was deleted.

124 changes: 124 additions & 0 deletions src/easylink/pipeline_schema_constants/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from easylink.graph_components import (
EdgeParams,
InputSlot,
InputSlotMapping,
OutputSlot,
OutputSlotMapping,
)
from easylink.step import HierarchicalStep, InputStep, LoopStep, OutputStep, Step
from easylink.utilities.validation_utils import validate_input_file_dummy

SINGLE_STEP_NODES = [
InputStep(),
Step(
step_name="step_1",
input_slots=[
InputSlot(
name="step_1_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
)
],
output_slots=[OutputSlot("step_1_main_output")],
),
OutputStep(
input_slots=[
InputSlot(name="result", env_var=None, validator=validate_input_file_dummy)
],
),
]
SINGLE_STEP_EDGES = [
EdgeParams(
source_node="input_data",
target_node="step_1",
output_slot="all",
input_slot="step_1_main_input",
),
EdgeParams(
source_node="step_1",
target_node="results",
output_slot="step_1_main_output",
input_slot="result",
),
]

SINGLE_STEP_SCHEMA_PARAMS = (SINGLE_STEP_NODES, SINGLE_STEP_EDGES)


BAD_COMBINED_TOPOLOGY_NODES = [
InputStep(),
LoopStep(
template_step=HierarchicalStep(
step_name="step_1",
input_slots=[
InputSlot(
name="step_1_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1_main_output")],
nodes=[
Step(
step_name="step_1a",
input_slots=[
InputSlot(
name="step_1a_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1a_main_output")],
),
Step(
step_name="step_1b",
input_slots=[
InputSlot(
name="step_1b_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1b_main_output")],
),
],
edges=[
EdgeParams(
source_node="step_1a",
target_node="step_1b",
output_slot="step_1a_main_output",
input_slot="step_1b_main_input",
),
],
input_slot_mappings=[
InputSlotMapping(
parent_slot="step_1_main_input",
child_node="step_1a",
child_slot="step_1a_main_input",
),
],
output_slot_mappings=[
OutputSlotMapping(
parent_slot="step_1_main_output",
child_node="step_1b",
child_slot="step_1b_main_output",
),
],
),
self_edges=[
EdgeParams(
source_node="step_1",
target_node="step_1",
output_slot="step_1_main_output",
input_slot="step_1_main_input",
),
],
),
OutputStep(
input_slots=[
InputSlot(name="result", env_var=None, validator=validate_input_file_dummy)
],
),
]

BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS = (BAD_COMBINED_TOPOLOGY_NODES, SINGLE_STEP_EDGES)
3 changes: 1 addition & 2 deletions src/easylink/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ def implementation_node_name(self) -> str:
if step_name != node_name:
prefix = node_names[i:]
break
# If we didn't include the step name already for a combined implementation,
# do so now.
# If we didn't include the step name already for a combined implementation, do so now.
if self.configuration_state.is_combined and not prefix:
prefix.append(self.name)
prefix.append(implementation_name)
Expand Down
Loading

0 comments on commit 27555c9

Please sign in to comment.