Log Output
+
+Paste the log output here.
+
+
+
+**Expected behavior**
+A clear and concise description of what you expected to happen.
+
+**Deployment information**
+Describe what you've deployed and how:
+ - Elyra version: [e.g. 1.5.3]
+ - Operating system: [e.g. macos, linux]
+ - Installation source: [e.g. PyPI, conda, from source, official container image, custom container image]
+ - Deployment type: [e.g. local installation, Docker, Kubernetes, Kubeflow [notebook server] , Open Data Hub]
+
+**Pipeline runtime environment**
+If the issue is related to pipeline execution, identify the environment where the pipeline is executed
+ - Local execution
+ - Kubeflow Pipelines (provide version number, whether multi-user auth enabled)
+ - Apache Airflow (provide version number)
+
+**Runtime configuration settings**
+If the issue is related to pipeline execution, document the runtime configuration settings from the Elyra UI, omitting confidential information.
diff --git a/elyra/pipeline/parser.py b/elyra/pipeline/parser.py
index 2ee32f1d3..318993c1b 100644
--- a/elyra/pipeline/parser.py
+++ b/elyra/pipeline/parser.py
@@ -133,6 +133,9 @@ def _create_pipeline_operation(node: Dict, super_node: Optional[Dict] = None):
id=node_id,
type=node.get('type'),
classifier=node.get('op'),
+ cpu=PipelineParser._get_app_data_field(node, 'cpu'),
+ gpu=PipelineParser._get_app_data_field(node, 'gpu'),
+ memory=PipelineParser._get_app_data_field(node, 'memory'),
filename=PipelineParser._get_app_data_field(node, 'filename'),
runtime_image=PipelineParser._get_app_data_field(node, 'runtime_image'),
dependencies=PipelineParser._get_app_data_field(node, 'dependencies', []),
diff --git a/elyra/pipeline/pipeline.py b/elyra/pipeline/pipeline.py
index a1216a055..b1fc1545f 100644
--- a/elyra/pipeline/pipeline.py
+++ b/elyra/pipeline/pipeline.py
@@ -22,8 +22,8 @@ class Operation(object):
Represents a single operation in a pipeline
"""
- def __init__(self, id, type, classifier, filename, runtime_image, dependencies=None,
- include_subdirectories: bool = False, env_vars=None, inputs=None, outputs=None,
+ def __init__(self, id, type, classifier, filename, runtime_image, memory=None, cpu=None, gpu=None,
+ dependencies=None, include_subdirectories: bool = False, env_vars=None, inputs=None, outputs=None,
parent_operations=None):
"""
:param id: Generated UUID, 128 bit number used as a unique identifier
@@ -42,6 +42,9 @@ def __init__(self, id, type, classifier, filename, runtime_image, dependencies=N
:param inputs: List of files to be consumed by this operation, produced by parent operation(s)
:param outputs: List of files produced by this operation to be included in a child operation(s)
:param parent_operations: List of parent operation 'ids' required to execute prior to this operation
+ :param cpu: number of cpus requested to run the operation
+ :param memory: amount of memory requested to run the operation (in Gi)
+ :param gpu: number of gpus requested to run the operation
"""
# validate that the operation has all required properties
@@ -67,6 +70,9 @@ def __init__(self, id, type, classifier, filename, runtime_image, dependencies=N
self._inputs = inputs or []
self._outputs = outputs or []
self._parent_operations = parent_operations or []
+ self._cpu = cpu
+ self._gpu = gpu
+ self._memory = memory
@property
def id(self):
@@ -104,6 +110,18 @@ def include_subdirectories(self):
def env_vars(self):
return self._env_vars
+ @property
+ def cpu(self):
+ return self._cpu
+
+ @property
+ def memory(self):
+ return self._memory
+
+ @property
+ def gpu(self):
+ return self._gpu
+
def env_vars_as_dict(self, logger: Optional[object] = None) -> Dict:
"""Operation stores environment variables in a list of name=value pairs, while
subprocess.run() requires a dictionary - so we must convert. If no envs are
@@ -155,7 +173,10 @@ def __eq__(self, other: object) -> bool:
self.include_subdirectories == other.include_subdirectories and \
self.outputs == other.outputs and \
self.inputs == other.inputs and \
- self.parent_operations == other.parent_operations
+ self.parent_operations == other.parent_operations and \
+ self.cpu == other.cpu and \
+ self.gpu == other.gpu and \
+ self.memory == other.memory
def __str__(self) -> str:
return "componentID : {id} \n " \
@@ -166,15 +187,21 @@ def __str__(self) -> str:
"filename : {filename} \n " \
"inputs : {inputs} \n " \
"outputs : {outputs} \n " \
- "runtime image : {image} \n ".format(id=self.id,
- name=self.name,
- parent_op=self.parent_operations,
- depends=self.dependencies,
- inc_subdirs=self.include_subdirectories,
- filename=self.filename,
- inputs=self.inputs,
- outputs=self.outputs,
- image=self.runtime_image)
+ "image : {image} \n " \
+ "gpu: {gpu} \n " \
+ "memory: {memory} \n " \
+ "cpu : {cpu} \n ".format(id=self.id,
+ name=self.name,
+ parent_op=self.parent_operations,
+ depends=self.dependencies,
+ inc_subdirs=self.include_subdirectories,
+ filename=self.filename,
+ inputs=self.inputs,
+ outputs=self.outputs,
+ image=self.runtime_image,
+ gpu=self.gpu,
+ cpu=self.cpu,
+ memory=self.memory)
class Pipeline(object):
diff --git a/elyra/pipeline/processor_kfp.py b/elyra/pipeline/processor_kfp.py
index 6ef4771f9..2b4a6f962 100644
--- a/elyra/pipeline/processor_kfp.py
+++ b/elyra/pipeline/processor_kfp.py
@@ -270,6 +270,9 @@ def _cc_pipeline(self, pipeline, pipeline_name):
pipeline_outputs=operation.outputs,
pipeline_envs=pipeline_envs,
emptydir_volume_size=emptydir_volume_size,
+ cpu_request=operation.cpu,
+ mem_request=operation.memory,
+ gpu_limit=operation.gpu,
image=operation.runtime_image,
file_outputs={
'mlpipeline-metrics':
diff --git a/elyra/templates/kfp_template.jinja2 b/elyra/templates/kfp_template.jinja2
index 4adf4b0de..af7c7eee2 100644
--- a/elyra/templates/kfp_template.jinja2
+++ b/elyra/templates/kfp_template.jinja2
@@ -22,6 +22,9 @@ def create_pipeline():
cos_dependencies_archive='{{ operation.cos_dependencies_archive }}',
pipeline_inputs={{ operation.pipeline_inputs }},
pipeline_outputs={{ operation.pipeline_outputs }},
+ cpu_request='{{ operation.cpu_request }}',
+ mem_request='{{ operation.mem_request }}G',
+ gpu_limit='{{ operation.gpu_limit }}',
image='{{ operation.image }}',
file_outputs={
'mlpipeline-metrics': '{{ metrics_file }}',
diff --git a/packages/pipeline-editor/src/PipelineEditorWidget.tsx b/packages/pipeline-editor/src/PipelineEditorWidget.tsx
index 3077e1b31..5d46f0e9a 100644
--- a/packages/pipeline-editor/src/PipelineEditorWidget.tsx
+++ b/packages/pipeline-editor/src/PipelineEditorWidget.tsx
@@ -492,6 +492,9 @@ export class PipelineEditor extends React.Component<
app_data.dependencies;
node_props.parameterDef.current_parameters.include_subdirectories =
app_data.include_subdirectories;
+ node_props.parameterDef.current_parameters.cpu = app_data.cpu;
+ node_props.parameterDef.current_parameters.memory = app_data.memory;
+ node_props.parameterDef.current_parameters.gpu = app_data.gpu;
node_props.parameterDef.titleDefinition = {
title: this.canvasController.getNode(source.id).label,
editable: true
@@ -540,6 +543,9 @@ export class PipelineEditor extends React.Component<
app_data.env_vars = propertySet.env_vars;
app_data.dependencies = propertySet.dependencies;
app_data.include_subdirectories = propertySet.include_subdirectories;
+ app_data.cpu = propertySet.cpu;
+ app_data.memory = propertySet.memory;
+ app_data.gpu = propertySet.gpu;
this.validateAllNodes();
this.updateModel();
}
@@ -848,6 +854,22 @@ export class PipelineEditor extends React.Component<
}
}
+ cleanNullProperties(): void {
+ // Delete optional fields that have null value
+ for (const node of this.canvasController.getPipelineFlow().pipelines[0]
+ .nodes) {
+ if (node.app_data.cpu === null) {
+ delete node.app_data.cpu;
+ }
+ if (node.app_data.memory === null) {
+ delete node.app_data.memory;
+ }
+ if (node.app_data.gpu === null) {
+ delete node.app_data.gpu;
+ }
+ }
+ }
+
async handleExportPipeline(): Promise