From d490e96e4526dcd06ac20b4810feed5dff063a70 Mon Sep 17 00:00:00 2001 From: Chi Chang Date: Thu, 10 Oct 2024 09:59:01 -0700 Subject: [PATCH 1/2] Cleanup spark related logs from setup_tron_namespace --- paasta_tools/cli/cmds/spark_run.py | 4 ++-- paasta_tools/setup_tron_namespace.py | 2 +- paasta_tools/spark_tools.py | 33 +++++++++++++++++----------- paasta_tools/tron_tools.py | 13 ++++++----- requirements.txt | 2 +- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/paasta_tools/cli/cmds/spark_run.py b/paasta_tools/cli/cmds/spark_run.py index 6da09265d9..3b9300ebbf 100644 --- a/paasta_tools/cli/cmds/spark_run.py +++ b/paasta_tools/cli/cmds/spark_run.py @@ -239,9 +239,9 @@ def add_subparser(subparsers): "default_pool" ) except PaastaNotConfiguredError: - default_spark_cluster = "pnw-devc" + default_spark_cluster = "pnw-devc-spark" default_spark_pool = "batch" - valid_clusters = ["spark-pnw-prod", "pnw-devc"] + valid_clusters = ["pnw-devc-spark", "pnw-prod-spark"] list_parser.add_argument( "-c", diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index df2ac18844..d4ed73cbab 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -224,7 +224,7 @@ def main(): # since we need to print out what failed in either case failed.append(service) - if args.bulk_config_fetch: + if args.dry_run and args.bulk_config_fetch: updated_namespaces = client.update_namespaces(new_configs) if updated_namespaces: diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index 553c0cf319..aa581a826c 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -20,7 +20,7 @@ SPARK_AWS_CREDS_PROVIDER = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" SPARK_EXECUTOR_NAMESPACE = "paasta-spark" SPARK_DRIVER_POOL = "stable" -SPARK_JOB_USER = "TRON" +SPARK_TRON_JOB_USER = "TRON" SPARK_PROMETHEUS_SHARD = "ml-compute" SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml" MEM_MULTIPLIER = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4} @@ -176,7 +176,9 @@ def inject_spark_conf_str(original_cmd: str, spark_conf_str: str) -> str: return original_cmd -def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str: +def auto_add_timeout_for_spark_job( + cmd: str, timeout_job_runtime: str, silent: bool = False +) -> str: # Timeout only to be added for spark-submit commands # TODO: Add timeout for jobs using mrjob with spark-runner if "spark-submit" not in cmd: @@ -189,16 +191,17 @@ def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str: split_cmd = cmd.split("spark-submit") # split_cmd[0] will always be an empty string or end with a space cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}" - log.info( - PaastaColors.blue( - f"NOTE: Job will exit in given time {timeout_job_runtime}. " - f"Adjust timeout value using --timeout-job-timeout. " - f"New Updated Command with timeout: {cmd}" - ), - ) + if not silent: + log.info( + PaastaColors.blue( + f"NOTE: Job will exit in given time {timeout_job_runtime}. " + f"Adjust timeout value using --timeout-job-runtime. " + f"New Updated Command with timeout: {cmd}" + ), + ) except Exception as e: err_msg = ( - f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. " + f"'timeout' could not be added to spark command: '{cmd}' due to error '{e}'. " "Please report to #spark." ) log.warn(err_msg) @@ -211,9 +214,12 @@ def build_spark_command( spark_config_dict: Dict[str, Any], is_mrjob: bool, timeout_job_runtime: str, + silent: bool = False, ) -> str: - command = f"{inject_spark_conf_str(original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob))}" - return auto_add_timeout_for_spark_job(command, timeout_job_runtime) + command = inject_spark_conf_str( + original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob) + ) + return auto_add_timeout_for_spark_job(command, timeout_job_runtime, silent=silent) def get_spark_ports_from_config(spark_conf: Dict[str, str]) -> List[int]: @@ -238,6 +244,7 @@ def get_spark_driver_monitoring_annotations( def get_spark_driver_monitoring_labels( spark_config: Dict[str, str], + user: str, ) -> Dict[str, str]: """ Returns Spark driver pod labels - generally for Prometheus metric relabeling. @@ -245,7 +252,7 @@ def get_spark_driver_monitoring_labels( ui_port_str = str(spark_config.get("spark.ui.port", "")) labels = { "paasta.yelp.com/prometheus_shard": SPARK_PROMETHEUS_SHARD, - "spark.yelp.com/user": SPARK_JOB_USER, + "spark.yelp.com/user": user, "spark.yelp.com/driver_ui_port": ui_port_str, } return labels diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 88e8ebef4b..81cbcd6e06 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -347,7 +347,7 @@ def build_spark_config(self) -> Dict[str, str]: docker_img_url = self.get_docker_url(system_paasta_config) - spark_conf_builder = SparkConfBuilder() + spark_conf_builder = SparkConfBuilder(is_driver_on_k8s_tron=True) spark_conf = spark_conf_builder.get_spark_conf( cluster_manager="kubernetes", spark_app_base_name=spark_app_name, @@ -366,7 +366,7 @@ def build_spark_config(self) -> Dict[str, str]: force_spark_resource_configs=self.config_dict.get( "force_spark_resource_configs", False ), - user=spark_tools.SPARK_JOB_USER, + user=spark_tools.SPARK_TRON_JOB_USER, ) # delete the dynamically generated spark.app.id to prevent frequent config updates in Tron. # spark.app.id will be generated later by yelp spark-submit wrapper or Spark itself. @@ -380,9 +380,10 @@ def build_spark_config(self) -> Dict[str, str]: if "spark.app.name" not in stringified_spark_args else stringified_spark_args["spark.app.name"] ) - # TODO: Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper + + # TODO(MLCOMPUTE-1220): Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper if "spark.kubernetes.executor.podTemplateFile" in spark_conf: - print( + log.info( f"Replacing spark.kubernetes.executor.podTemplateFile=" f"{spark_conf['spark.kubernetes.executor.podTemplateFile']} with " f"spark.kubernetes.executor.podTemplateFile={spark_tools.SPARK_DNS_POD_TEMPLATE}" @@ -1044,6 +1045,7 @@ def format_tron_action_dict(action_config: TronActionConfig): action_config.config_dict.get( "max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT ), + silent=True, ) # point to the KUBECONFIG needed by Spark driver result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() @@ -1074,7 +1076,8 @@ def format_tron_action_dict(action_config: TronActionConfig): ) ) monitoring_labels = spark_tools.get_spark_driver_monitoring_labels( - action_config.action_spark_config + action_config.action_spark_config, + user=spark_tools.SPARK_TRON_JOB_USER, ) result["annotations"].update(monitoring_annotations) result["labels"].update(monitoring_labels) diff --git a/requirements.txt b/requirements.txt index 6819f2c9b9..fe04b6b077 100644 --- a/requirements.txt +++ b/requirements.txt @@ -91,7 +91,7 @@ rsa==4.7.2 ruamel.yaml==0.15.96 s3transfer==0.10.0 sensu-plugin==0.3.1 -service-configuration-lib==2.18.21 +service-configuration-lib==2.18.24 setuptools==39.0.1 signalfx==1.0.17 simplejson==3.10.0 From 9926b4ef697286441fb4feea4237b438bc1a5006 Mon Sep 17 00:00:00 2001 From: Chi Chang Date: Thu, 10 Oct 2024 10:09:00 -0700 Subject: [PATCH 2/2] Fix driver on k8s pod template overwriting --- paasta_tools/tron_tools.py | 6 +++--- tests/test_tron_tools.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 81cbcd6e06..ccccf5f557 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -388,9 +388,9 @@ def build_spark_config(self) -> Dict[str, str]: f"{spark_conf['spark.kubernetes.executor.podTemplateFile']} with " f"spark.kubernetes.executor.podTemplateFile={spark_tools.SPARK_DNS_POD_TEMPLATE}" ) - spark_conf[ - "spark.kubernetes.executor.podTemplateFile" - ] = spark_tools.SPARK_DNS_POD_TEMPLATE + spark_conf[ + "spark.kubernetes.executor.podTemplateFile" + ] = spark_tools.SPARK_DNS_POD_TEMPLATE spark_conf.update( { diff --git a/tests/test_tron_tools.py b/tests/test_tron_tools.py index ad30ae5898..aea121dd06 100644 --- a/tests/test_tron_tools.py +++ b/tests/test_tron_tools.py @@ -1275,7 +1275,6 @@ def test_format_tron_action_dict_spark( "--conf spark.kubernetes.executor.label.yelp.com/pool=special_pool " "--conf spark.kubernetes.executor.label.paasta.yelp.com/pool=special_pool " "--conf spark.kubernetes.executor.label.yelp.com/owner=core_ml " - "--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml " "--conf spark.kubernetes.executor.volumes.hostPath.0.mount.path=/nail/bulkdata " "--conf spark.kubernetes.executor.volumes.hostPath.0.options.path=/nail/bulkdata " "--conf spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly=true " @@ -1307,6 +1306,7 @@ def test_format_tron_action_dict_spark( "--conf spark.kubernetes.allocation.batch.size=512 " "--conf spark.kubernetes.decommission.script=/opt/spark/kubernetes/dockerfiles/spark/decom.sh " "--conf spark.logConf=true " + "--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml " "--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider " "--conf spark.driver.host=$PAASTA_POD_IP " "--conf spark.kubernetes.authenticate.executor.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role "