Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLCOMPUTE-2001 | Cleanup spark related logs from setup_tron_namespace #3979

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ def add_subparser(subparsers):
"default_pool"
)
except PaastaNotConfiguredError:
default_spark_cluster = "pnw-devc"
default_spark_cluster = "pnw-devc-spark"
default_spark_pool = "batch"
valid_clusters = ["spark-pnw-prod", "pnw-devc"]
valid_clusters = ["pnw-devc-spark", "pnw-prod-spark"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To align the default values with the recent puppet changes for pool validation rules


list_parser.add_argument(
"-c",
Expand Down
2 changes: 1 addition & 1 deletion paasta_tools/setup_tron_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def main():
# since we need to print out what failed in either case
failed.append(service)

if args.bulk_config_fetch:
if args.dry_run and args.bulk_config_fetch:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should also skip Tron API calls here in the dry run mode?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

updated_namespaces = client.update_namespaces(new_configs)

if updated_namespaces:
Expand Down
33 changes: 20 additions & 13 deletions paasta_tools/spark_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SPARK_AWS_CREDS_PROVIDER = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
SPARK_EXECUTOR_NAMESPACE = "paasta-spark"
SPARK_DRIVER_POOL = "stable"
SPARK_JOB_USER = "TRON"
SPARK_TRON_JOB_USER = "TRON"
SPARK_PROMETHEUS_SHARD = "ml-compute"
SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml"
MEM_MULTIPLIER = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}
Expand Down Expand Up @@ -176,7 +176,9 @@ def inject_spark_conf_str(original_cmd: str, spark_conf_str: str) -> str:
return original_cmd


def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str:
def auto_add_timeout_for_spark_job(
cmd: str, timeout_job_runtime: str, silent: bool = False
) -> str:
Comment on lines +179 to +181
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw: for spark-drivers-on-k8s we should use the tron max_runtime config directly and get rid of this code entirely :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up, I think we can move this to be part of our custom spark-submit wrapper later, so it can be more easily monitored from the spark side and managed by spark configuration service & auto tuner, and also ensures the consistency of different types of spark deployments (tron, adhoc, jupyter).
(The current implementation of this also respects to the max_runtime tronfig from the caller side).

# Timeout only to be added for spark-submit commands
# TODO: Add timeout for jobs using mrjob with spark-runner
if "spark-submit" not in cmd:
Expand All @@ -189,16 +191,17 @@ def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str:
split_cmd = cmd.split("spark-submit")
# split_cmd[0] will always be an empty string or end with a space
cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}"
log.info(
PaastaColors.blue(
f"NOTE: Job will exit in given time {timeout_job_runtime}. "
f"Adjust timeout value using --timeout-job-timeout. "
f"New Updated Command with timeout: {cmd}"
),
)
if not silent:
log.info(
PaastaColors.blue(
f"NOTE: Job will exit in given time {timeout_job_runtime}. "
f"Adjust timeout value using --timeout-job-runtime. "
f"New Updated Command with timeout: {cmd}"
),
)
except Exception as e:
err_msg = (
f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. "
f"'timeout' could not be added to spark command: '{cmd}' due to error '{e}'. "
"Please report to #spark."
)
log.warn(err_msg)
Expand All @@ -211,9 +214,12 @@ def build_spark_command(
spark_config_dict: Dict[str, Any],
is_mrjob: bool,
timeout_job_runtime: str,
silent: bool = False,
) -> str:
command = f"{inject_spark_conf_str(original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob))}"
return auto_add_timeout_for_spark_job(command, timeout_job_runtime)
command = inject_spark_conf_str(
original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob)
)
return auto_add_timeout_for_spark_job(command, timeout_job_runtime, silent=silent)


def get_spark_ports_from_config(spark_conf: Dict[str, str]) -> List[int]:
Expand All @@ -238,14 +244,15 @@ def get_spark_driver_monitoring_annotations(

def get_spark_driver_monitoring_labels(
spark_config: Dict[str, str],
user: str,
) -> Dict[str, str]:
"""
Returns Spark driver pod labels - generally for Prometheus metric relabeling.
"""
ui_port_str = str(spark_config.get("spark.ui.port", ""))
labels = {
"paasta.yelp.com/prometheus_shard": SPARK_PROMETHEUS_SHARD,
"spark.yelp.com/user": SPARK_JOB_USER,
"spark.yelp.com/user": user,
"spark.yelp.com/driver_ui_port": ui_port_str,
}
return labels
19 changes: 11 additions & 8 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def build_spark_config(self) -> Dict[str, str]:

docker_img_url = self.get_docker_url(system_paasta_config)

spark_conf_builder = SparkConfBuilder()
spark_conf_builder = SparkConfBuilder(is_driver_on_k8s_tron=True)
spark_conf = spark_conf_builder.get_spark_conf(
cluster_manager="kubernetes",
spark_app_base_name=spark_app_name,
Expand All @@ -366,7 +366,7 @@ def build_spark_config(self) -> Dict[str, str]:
force_spark_resource_configs=self.config_dict.get(
"force_spark_resource_configs", False
),
user=spark_tools.SPARK_JOB_USER,
user=spark_tools.SPARK_TRON_JOB_USER,
)
# delete the dynamically generated spark.app.id to prevent frequent config updates in Tron.
# spark.app.id will be generated later by yelp spark-submit wrapper or Spark itself.
Expand All @@ -380,16 +380,17 @@ def build_spark_config(self) -> Dict[str, str]:
if "spark.app.name" not in stringified_spark_args
else stringified_spark_args["spark.app.name"]
)
# TODO: Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper

# TODO(MLCOMPUTE-1220): Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper
if "spark.kubernetes.executor.podTemplateFile" in spark_conf:
print(
log.info(
f"Replacing spark.kubernetes.executor.podTemplateFile="
f"{spark_conf['spark.kubernetes.executor.podTemplateFile']} with "
f"spark.kubernetes.executor.podTemplateFile={spark_tools.SPARK_DNS_POD_TEMPLATE}"
)
spark_conf[
"spark.kubernetes.executor.podTemplateFile"
] = spark_tools.SPARK_DNS_POD_TEMPLATE
spark_conf[
"spark.kubernetes.executor.podTemplateFile"
] = spark_tools.SPARK_DNS_POD_TEMPLATE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: we should set/overwrite the pod template file (for k8s dns mode, etc.) whether the pod template has already been set or not


spark_conf.update(
{
Expand Down Expand Up @@ -1044,6 +1045,7 @@ def format_tron_action_dict(action_config: TronActionConfig):
action_config.config_dict.get(
"max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT
),
silent=True,
)
# point to the KUBECONFIG needed by Spark driver
result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig()
Expand Down Expand Up @@ -1074,7 +1076,8 @@ def format_tron_action_dict(action_config: TronActionConfig):
)
)
monitoring_labels = spark_tools.get_spark_driver_monitoring_labels(
action_config.action_spark_config
action_config.action_spark_config,
user=spark_tools.SPARK_TRON_JOB_USER,
)
result["annotations"].update(monitoring_annotations)
result["labels"].update(monitoring_labels)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ rsa==4.7.2
ruamel.yaml==0.15.96
s3transfer==0.10.0
sensu-plugin==0.3.1
service-configuration-lib==2.18.21
service-configuration-lib==2.18.24
setuptools==39.0.1
signalfx==1.0.17
simplejson==3.10.0
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,6 @@ def test_format_tron_action_dict_spark(
"--conf spark.kubernetes.executor.label.yelp.com/pool=special_pool "
"--conf spark.kubernetes.executor.label.paasta.yelp.com/pool=special_pool "
"--conf spark.kubernetes.executor.label.yelp.com/owner=core_ml "
"--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml "
"--conf spark.kubernetes.executor.volumes.hostPath.0.mount.path=/nail/bulkdata "
"--conf spark.kubernetes.executor.volumes.hostPath.0.options.path=/nail/bulkdata "
"--conf spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly=true "
Expand Down Expand Up @@ -1307,6 +1306,7 @@ def test_format_tron_action_dict_spark(
"--conf spark.kubernetes.allocation.batch.size=512 "
"--conf spark.kubernetes.decommission.script=/opt/spark/kubernetes/dockerfiles/spark/decom.sh "
"--conf spark.logConf=true "
"--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml "
"--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider "
"--conf spark.driver.host=$PAASTA_POD_IP "
"--conf spark.kubernetes.authenticate.executor.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role "
Expand Down
Loading