diff --git a/CHANGELOG.rst b/CHANGELOG.rst index faa1894c..1ca0b596 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,7 @@ +**0.1.2 - 12/16/24** + + - Add optional arg to pass allowable schemas to the Config constructor + **0.1.1 - 12/10/24** - Implement pipeline choice sections diff --git a/src/easylink/configuration.py b/src/easylink/configuration.py index b103ca06..19619ed3 100644 --- a/src/easylink/configuration.py +++ b/src/easylink/configuration.py @@ -48,6 +48,7 @@ class Config(LayeredConfigTree): def __init__( self, config_params: dict[str, Any], + potential_schemas: list[PipelineSchema] | PipelineSchema = PIPELINE_SCHEMAS, ): super().__init__(layers=["initial_data", "default", "user_configured"]) self.update(DEFAULT_ENVIRONMENT, layer="default") @@ -56,10 +57,11 @@ def __init__( self.update({"pipeline": {"combined_implementations": {}}}, layer="default") if self.environment.computing_environment == "slurm": # Set slurm defaults to empty dict instead of None so that we don't get errors - # In slurm resources property + # in slurm_resources property self.update({"environment": {"slurm": {}}}, layer="default") - - self.update({"schema": self._get_schema()}, layer="initial_data") + if not isinstance(potential_schemas, list): + potential_schemas = [potential_schemas] + self.update({"schema": self._get_schema(potential_schemas)}, layer="initial_data") self.schema.configure_pipeline(self.pipeline, self.input_data) self._validate() self.freeze() @@ -122,7 +124,7 @@ def spark_resources(self) -> dict[str, Any]: # Setup Methods # ################# - def _get_schema(self) -> PipelineSchema: + def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema: """Validates the requested pipeline against supported schemas. Notes @@ -134,7 +136,7 @@ def _get_schema(self) -> PipelineSchema: """ errors = defaultdict(dict) # Try each schema until one is validated - for schema in PIPELINE_SCHEMAS: + for schema in potential_schemas: logs = schema.validate_step(self.pipeline, self.input_data) if logs: errors[PIPELINE_ERRORS_KEY][schema.name] = logs diff --git a/src/easylink/implementation_metadata.yaml b/src/easylink/implementation_metadata.yaml index 613d621b..55b7b491 100644 --- a/src/easylink/implementation_metadata.yaml +++ b/src/easylink/implementation_metadata.yaml @@ -6,6 +6,24 @@ step_1_python_pandas: script_cmd: python /dummy_step.py outputs: step_1_main_output: result.parquet +step_1a_python_pandas: + steps: + - step_1a + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py + env: + INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" + outputs: + step_1a_main_output: result.parquet +step_1b_python_pandas: + steps: + - step_1b + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py + env: + INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" + outputs: + step_1b_main_output: result.parquet step_2_python_pandas: steps: - step_2 @@ -59,8 +77,8 @@ step_4a_python_pandas: step_4b_python_pandas: steps: - step_4b - image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/r-image.sif - script_cmd: Rscript /dummy_step.R + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py env: INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" outputs: @@ -68,8 +86,8 @@ step_4b_python_pandas: step_4b_r: steps: - step_4b - image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif - script_cmd: python /dummy_step.py + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/r-image.sif + script_cmd: Rscript /dummy_step.R env: INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" outputs: @@ -167,3 +185,11 @@ step_3_and_step_4_combined_python_pandas: script_cmd: python /dummy_step.py outputs: step_4_main_output: result.parquet +step_1a_and_step_1b_combined_python_pandas: + steps: + - step_1a + - step_1b + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py + outputs: + step_1_main_output: result.parquet \ No newline at end of file diff --git a/src/easylink/pipeline_graph.py b/src/easylink/pipeline_graph.py index ff544fc3..d66e694c 100644 --- a/src/easylink/pipeline_graph.py +++ b/src/easylink/pipeline_graph.py @@ -62,7 +62,7 @@ def merge_combined_implementations(self, config): metadata_steps = implementation_metadata[combined_implementation_config["name"]][ "steps" ] - self.validate_implementation_topology(nodes_to_merge, metadata_steps) + self.validate_combined_implementation_topology(nodes_to_merge, metadata_steps) ( combined_input_slots, @@ -309,11 +309,17 @@ def spark_is_required(self) -> bool: """Check if the pipeline requires spark resources.""" return any([implementation.requires_spark for implementation in self.implementations]) - def validate_implementation_topology( + def validate_combined_implementation_topology( self, nodes: list[str], metadata_steps: list[str] ) -> None: - """Check that the subgraph induced by the nodes implemented by this implementation - is topologically consistent with the list of metadata steps.""" + """Validates the combined implementation topology against intended implementation. + + Check that the subgraph induced by the nodes implemented by this implementation + is topologically consistent with the list of steps intended to be implemented. + """ + # HACK: We cannot just call self.subgraph(nodes) because networkx will + # try and instantiate another PipelineGraph which requires a Config object + # to be passed to the constructor. subgraph = ImplementationGraph(self).subgraph(nodes) # Relabel nodes by schema step @@ -322,16 +328,22 @@ def validate_implementation_topology( for node, data in subgraph.nodes(data=True) } if not set(mapping.values()) == set(metadata_steps): + # NOTE: It's possible that we've combined nodes in such a way that removed + # an edge from the graph and so nx is unable to reliably sort the subgraph. + full_pipeline_sorted_nodes = list(nx.topological_sort(self)) + sorted_nodes = [node for node in full_pipeline_sorted_nodes if node in mapping] raise ValueError( - f"Pipeline configuration nodes {list(mapping.values())} do not match metadata steps {metadata_steps}." + f"Pipeline configuration nodes {[mapping[node] for node in sorted_nodes]} do not match metadata steps {metadata_steps}." ) subgraph = nx.relabel_nodes(subgraph, mapping) - # Check for topological inconsistency, i.e. if there - # is a path from a later node to an earlier node. - for i in range(len(metadata_steps)): - for j in range(i + 1, len(metadata_steps)): - if nx.has_path(subgraph, metadata_steps[j], metadata_steps[i]): + # Check for topological inconsistency, i.e. if there is a path from a later node to an earlier node. + for predecessor in range(len(metadata_steps)): + for successor in range(predecessor + 1, len(metadata_steps)): + if nx.has_path( + subgraph, metadata_steps[successor], metadata_steps[predecessor] + ): raise ValueError( - f"Pipeline configuration nodes {set(subgraph.nodes())} are not topologically consistent with metadata steps {set(metadata_steps)}:" - f"There is a path from successor {metadata_steps[j]} to predecessor {metadata_steps[i]}" + f"Pipeline configuration nodes {list(nx.topological_sort(subgraph))} are not topologically consistent with " + f"the intended implementations for {list(metadata_steps)}:\n" + f"There is a path from successor {metadata_steps[successor]} to predecessor {metadata_steps[predecessor]}." ) diff --git a/src/easylink/pipeline_schema_constants/__init__.py b/src/easylink/pipeline_schema_constants/__init__.py index 6bd85839..3b309fb4 100644 --- a/src/easylink/pipeline_schema_constants/__init__.py +++ b/src/easylink/pipeline_schema_constants/__init__.py @@ -1,9 +1,11 @@ -from easylink.pipeline_schema_constants import development, integration_test +from easylink.pipeline_schema_constants import development, tests ALLOWED_SCHEMA_PARAMS = { "development": development.SCHEMA_PARAMS, } TESTING_SCHEMA_PARAMS = { - "integration": integration_test.SCHEMA_PARAMS, + "integration": tests.SINGLE_STEP_SCHEMA_PARAMS, + "combined_bad_topology": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS, + "combined_bad_implementation_names": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS, } diff --git a/src/easylink/pipeline_schema_constants/integration_test.py b/src/easylink/pipeline_schema_constants/integration_test.py deleted file mode 100644 index dd719081..00000000 --- a/src/easylink/pipeline_schema_constants/integration_test.py +++ /dev/null @@ -1,39 +0,0 @@ -from easylink.graph_components import EdgeParams, InputSlot, OutputSlot -from easylink.step import InputSlot, InputStep, OutputStep, Step -from easylink.utilities.validation_utils import validate_input_file_dummy - -NODES = [ - InputStep(), - Step( - step_name="step_1", - input_slots=[ - InputSlot( - name="step_1_main_input", - env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", - validator=validate_input_file_dummy, - ) - ], - output_slots=[OutputSlot("step_1_main_output")], - ), - OutputStep( - input_slots=[ - InputSlot(name="result", env_var=None, validator=validate_input_file_dummy) - ], - ), -] -EDGES = [ - EdgeParams( - source_node="input_data", - target_node="step_1", - output_slot="all", - input_slot="step_1_main_input", - ), - EdgeParams( - source_node="step_1", - target_node="results", - output_slot="step_1_main_output", - input_slot="result", - ), -] - -SCHEMA_PARAMS = (NODES, EDGES) diff --git a/src/easylink/pipeline_schema_constants/tests.py b/src/easylink/pipeline_schema_constants/tests.py new file mode 100644 index 00000000..44157e2a --- /dev/null +++ b/src/easylink/pipeline_schema_constants/tests.py @@ -0,0 +1,124 @@ +from easylink.graph_components import ( + EdgeParams, + InputSlot, + InputSlotMapping, + OutputSlot, + OutputSlotMapping, +) +from easylink.step import HierarchicalStep, InputStep, LoopStep, OutputStep, Step +from easylink.utilities.validation_utils import validate_input_file_dummy + +SINGLE_STEP_NODES = [ + InputStep(), + Step( + step_name="step_1", + input_slots=[ + InputSlot( + name="step_1_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ) + ], + output_slots=[OutputSlot("step_1_main_output")], + ), + OutputStep( + input_slots=[ + InputSlot(name="result", env_var=None, validator=validate_input_file_dummy) + ], + ), +] +SINGLE_STEP_EDGES = [ + EdgeParams( + source_node="input_data", + target_node="step_1", + output_slot="all", + input_slot="step_1_main_input", + ), + EdgeParams( + source_node="step_1", + target_node="results", + output_slot="step_1_main_output", + input_slot="result", + ), +] + +SINGLE_STEP_SCHEMA_PARAMS = (SINGLE_STEP_NODES, SINGLE_STEP_EDGES) + + +BAD_COMBINED_TOPOLOGY_NODES = [ + InputStep(), + LoopStep( + template_step=HierarchicalStep( + step_name="step_1", + input_slots=[ + InputSlot( + name="step_1_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_1_main_output")], + nodes=[ + Step( + step_name="step_1a", + input_slots=[ + InputSlot( + name="step_1a_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_1a_main_output")], + ), + Step( + step_name="step_1b", + input_slots=[ + InputSlot( + name="step_1b_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_1b_main_output")], + ), + ], + edges=[ + EdgeParams( + source_node="step_1a", + target_node="step_1b", + output_slot="step_1a_main_output", + input_slot="step_1b_main_input", + ), + ], + input_slot_mappings=[ + InputSlotMapping( + parent_slot="step_1_main_input", + child_node="step_1a", + child_slot="step_1a_main_input", + ), + ], + output_slot_mappings=[ + OutputSlotMapping( + parent_slot="step_1_main_output", + child_node="step_1b", + child_slot="step_1b_main_output", + ), + ], + ), + self_edges=[ + EdgeParams( + source_node="step_1", + target_node="step_1", + output_slot="step_1_main_output", + input_slot="step_1_main_input", + ), + ], + ), + OutputStep( + input_slots=[ + InputSlot(name="result", env_var=None, validator=validate_input_file_dummy) + ], + ), +] + +BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS = (BAD_COMBINED_TOPOLOGY_NODES, SINGLE_STEP_EDGES) diff --git a/src/easylink/step.py b/src/easylink/step.py index f0cc7a1b..847ecfe7 100644 --- a/src/easylink/step.py +++ b/src/easylink/step.py @@ -425,8 +425,7 @@ def implementation_node_name(self) -> str: if step_name != node_name: prefix = node_names[i:] break - # If we didn't include the step name already for a combined implementation, - # do so now. + # If we didn't include the step name already for a combined implementation, do so now. if self.configuration_state.is_combined and not prefix: prefix.append(self.name) prefix.append(implementation_name) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index d8b78ae4..915fd8e2 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -621,6 +621,74 @@ }, }, }, + "combined_bad_topology": { + "steps": { + "step_1": { + "iterate": [ + { + "substeps": { + "step_1a": { + "implementation": { + "name": "step_1a_python_pandas", + }, + }, + "step_1b": { + "combined_implementation_key": "step_1a_1b", + }, + }, + }, + { + "substeps": { + "step_1a": { + "combined_implementation_key": "step_1a_1b", + }, + "step_1b": { + "implementation": { + "name": "step_1b_python_pandas", + }, + }, + }, + }, + ], + }, + }, + "combined_implementations": { + "step_1a_1b": { + "name": "step_1a_and_step_1b_combined_python_pandas", + }, + }, + }, + "combined_bad_implementation_names": { + "steps": { + "step_1": { + "implementation": { + "name": "step_1_python_pandas", + }, + }, + "step_2": { + "implementation": { + "name": "step_2_python_pandas", + }, + "combined_implementation_key": "step_3_4", # incorrect key + }, + "step_3": { + "implementation": { + "name": "step_3_python_pandas", + }, + }, + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, + }, + }, + "combined_implementations": { + "step_3_4": { + "name": "step_3_and_step_4_combined_python_pandas", + }, + }, + }, } diff --git a/tests/unit/test_pipeline_graph.py b/tests/unit/test_pipeline_graph.py index 69fccd55..3889343c 100644 --- a/tests/unit/test_pipeline_graph.py +++ b/tests/unit/test_pipeline_graph.py @@ -1,3 +1,4 @@ +import re from pathlib import Path import pytest @@ -5,6 +6,8 @@ from easylink.configuration import Config from easylink.graph_components import InputSlot, OutputSlot from easylink.pipeline_graph import PipelineGraph +from easylink.pipeline_schema import PipelineSchema +from easylink.pipeline_schema_constants import TESTING_SCHEMA_PARAMS from easylink.utilities.validation_utils import validate_input_file_dummy from tests.unit.conftest import COMBINED_IMPLEMENTATION_CONFIGS @@ -262,7 +265,7 @@ def test_condense_input_slots() -> None: assert slot["filepaths"] == expected_slot["filepaths"] -def test_condense_input_slots_duplicate_slots() -> None: +def test_condense_input_slots_duplicate_slots_raises() -> None: input_slots = [ InputSlot( "step_1_main_input", @@ -283,7 +286,7 @@ def test_condense_input_slots_duplicate_slots() -> None: PipelineGraph.condense_input_slots(input_slots, filepaths_by_slot) -def test_condense_input_slots_duplicate_slots() -> None: +def test_condense_input_slots_duplicate_slots_raises() -> None: input_slots = [ InputSlot( "step_1_main_input", @@ -575,40 +578,53 @@ def test_merge_combined_implementations_parallel(default_config_params, test_dir check_nodes_and_edges(pipeline_graph, expected_nodes, expected_edges) -def test_cycle_error(default_config_params) -> None: - config_params = default_config_params - config_params["pipeline"] = COMBINED_IMPLEMENTATION_CONFIGS["with_iteration_cycle"] - with pytest.raises(ValueError, match=("The MultiDiGraph contains a cycle:")): - PipelineGraph(Config(config_params)) - - -def test_combined_extra_step(default_config_params): - config_params = default_config_params - config_params["pipeline"] = COMBINED_IMPLEMENTATION_CONFIGS["with_extra_node"] - with pytest.raises( - ValueError, - match=r"Pipeline configuration nodes \['step_2', 'step_3', 'step_4'\] do not match metadata steps \['step_3', 'step_4'\].", - ): - PipelineGraph(Config(config_params)) - - -def test_combined_missing_node(default_config_params): +@pytest.mark.parametrize( + "problem_key, error_msg, use_custom_schema", + [ + ( + "with_iteration_cycle", + "The MultiDiGraph contains a cycle: [('step_3_4', 'step_3_loop_2_step_3_python_pandas', 0), " + "('step_3_loop_2_step_3_python_pandas', 'step_3_4', 0)]", + False, + ), + ( + "with_extra_node", + "Pipeline configuration nodes ['step_2', 'step_3', 'step_4'] do not match metadata steps ['step_3', 'step_4'].", + False, + ), + ( + "with_missing_node", + "Pipeline configuration nodes ['step_4'] do not match metadata steps ['step_3', 'step_4'].", + False, + ), + ( + "combined_bad_topology", + "Pipeline configuration nodes ['step_1b', 'step_1a'] are not topologically consistent with the intended implementations for ['step_1a', 'step_1b']:\nThere is a path from successor step_1b to predecessor step_1a.", + True, + ), + ( + "combined_bad_implementation_names", + "Pipeline configuration nodes ['step_2', 'step_4'] do not match metadata steps ['step_3', 'step_4'].", + False, + ), + ], +) +def test_bad_combined_configuration_raises( + problem_key, error_msg, use_custom_schema, default_config_params +) -> None: config_params = default_config_params - config_params["pipeline"] = COMBINED_IMPLEMENTATION_CONFIGS["with_missing_node"] - with pytest.raises( - ValueError, - match=r"Pipeline configuration nodes \['step_4'\] do not match metadata steps \['step_3', 'step_4'\].", - ): - PipelineGraph(Config(config_params)) - - -# TODO MIC-5476: Add a test here when we have modularized pipeline schemas for testing -@pytest.mark.skip(reason="Not implemented") -def test_combined_bad_topology(): - pass - - -### Helper functions ### + config_params["pipeline"] = COMBINED_IMPLEMENTATION_CONFIGS[problem_key] + with pytest.raises(ValueError, match=re.escape(error_msg)): + if use_custom_schema: + schema = PipelineSchema(problem_key, *TESTING_SCHEMA_PARAMS[problem_key]) + PipelineGraph(Config(config_params, schema)) + else: + PipelineGraph(Config(config_params)) + + +#################### +# Helper functions # +#################### def check_nodes_and_edges(pipeline_graph, expected_nodes, expected_edges):