Skip to content

Commit

Permalink
launcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mattkappel committed Nov 2, 2023
1 parent 2c97b35 commit b994276
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 75 deletions.
23 changes: 5 additions & 18 deletions src/linker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
handle_exceptions,
prepare_results_directory,
)
from linker.utilities.spark_utils import build_cluster


@click.group()
Expand Down Expand Up @@ -151,34 +152,20 @@ def run_slurm_job(
default=None,
show_default=True,
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
help=("Path to a computing environment yaml file on which to launch the step."),
help=("Path to a computing environment yaml file describing Spark configuration."),
)
@click.option("--input-data", multiple=True)
@click.option("-v", "verbose", count=True, help="Configure logging verbosity.", hidden=True)
def build_spark_cluster(
container_engine: str,
results_dir: str,
step_name: str,
step_dir: str,
input_data: Tuple[str],
computing_environment: str,
verbose: int,
) -> None:
"""Runs a job on Slurm. The standard use case is this would be kicked off
when a slurm computing environment is defined in the environment.yaml.
"""
configure_logging_to_terminal(verbose)
results_dir = Path(results_dir)
step_dir = Path(step_dir)
input_data = [Path(x) for x in input_data]
main = handle_exceptions(
func=runner.run_container, exceptions_logger=logger, with_debugger=False
)
main(
container_engine=container_engine,
input_data=input_data,
results_dir=results_dir,
step_name=step_name,
step_dir=step_dir,
func=build_cluster, exceptions_logger=logger, with_debugger=False
)
main(computing_environment)

logger.info("*** FINISHED ***")
13 changes: 9 additions & 4 deletions src/linker/containers/spark_cluster/start_spark_slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ export SPARK_LOCAL_DIRS=$HOME/.spark_temp/logs
export SPARK_MASTER_PORT=28508
export SPARK_MASTER_WEBUI_PORT=28509
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
# shellcheck disable=SC2004
export SPARK_DAEMON_MEMORY=$(( $SLURM_MEM_PER_CPU * $SLURM_CPUS_PER_TASK / 2 ))m
export SPARK_MEM=$SPARK_DAEMON_MEMORY

Expand All @@ -43,13 +42,19 @@ else
MASTER_NODE=$( scontrol show hostname "$SLURM_NODELIST "| head -n 1 )

mkdir -p "/tmp/pvs_like_case_study_spark_local_$USER"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp $SINGULARITY_IMG $CONDA_PATH run --no-capture-output -n $CONDA_ENV "$SPARK_ROOT/bin/spark-class" org.apache.spark.deploy.master.Master --host "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp $SINGULARITY_IMG \
$CONDA_PATH run --no-capture-output -n $CONDA_ENV "$SPARK_ROOT/bin/spark-class" \
org.apache.spark.deploy.master.Master --host "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" \
--webui-port "$SPARK_MASTER_WEBUI_PORT"
else
# TODO: This step assumes that SLURM_PROCID=0 corresponds to the first node in SLURM_NODELIST. Is this reasonable?
# TODO: This step assumes that SLURM_PROCID=0 corresponds to the first node in SLURM_NODELIST.
# Is this reasonable?
MASTER_NODE=spark://$( scontrol show hostname "$SLURM_NODELIST" | head -n 1 ):"$SPARK_MASTER_PORT"

mkdir -p "/tmp/pvs_like_case_study_spark_local_$USER"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp "$SINGULARITY_IMG" "$CONDA_PATH" run --no-capture-output -n "$CONDA_ENV" "$SPARK_ROOT/bin/spark-class" org.apache.spark.deploy.worker.Worker "$MASTER_NODE"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp "$SINGULARITY_IMG" \
"$CONDA_PATH" run --no-capture-output -n "$CONDA_ENV" "$SPARK_ROOT/bin/spark-class" \
org.apache.spark.deploy.worker.Worker "$MASTER_NODE"
fi
fi

2 changes: 1 addition & 1 deletion src/linker/specifications/spark_environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spark:
workers:
num_workers: 2 # num_workers + 1 nodes will be requested
cpus_per_task: 5
mem_per_cpu: 8000 # MB
mem_per_cpu: 8 # GB
time_limit: 1 # hours
# TODO: use fqdn to specify node URL
fqdn: "cluster.ihme.washington.edu" # node URL: hostname.[spark.workers.fqdn]
Expand Down
62 changes: 26 additions & 36 deletions src/linker/utilities/slurm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import types
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, TextIO

from loguru import logger

Expand Down Expand Up @@ -97,51 +97,41 @@ def get_cli_args(job_name, account, partition, peak_memory, max_runtime, num_thr

def submit_spark_cluster_job(
session: types.ModuleType("drmaa.Session"),
resources: Dict[str, str],
container_engine: str,
input_data: List[Path],
results_dir: Path,
step_name: str,
step_dir: Path,
launcher: TextIO,
account: str,
partition: str,
memory_per_node: int,
max_runtime: int,
num_workers: int,
cpus_per_task: int,
) -> None:
jt = session.createJobTemplate()
jt.jobName = f"spark_cluster_{datetime.now().strftime('%Y%m%d%H%M%S')}"
jt.joinFiles = False # keeps stdout separate from stderr
jt.outputPath = f":{str(results_dir / '%A.o%a')}"
jt.errorPath = f":{str(results_dir / '%A.e%a')}"
jt.remoteCommand = shutil.which("linker")
jt_args = [
"build-spark-cluster",
container_engine,
str(results_dir),
step_name,
str(step_dir),
"-vvv",
]
# for filepath in input_data:
# jt_args.extend(("--input-data", str(filepath)))
jt.args = jt_args
# jt.joinFiles = False # keeps stdout separate from stderr
# jt.outputPath = f":{str(results_dir / '%A.o%a')}"
# jt.errorPath = f":{str(results_dir / '%A.e%a')}"
jt.remoteCommand = shutil.which("sh")
jt.args = [launcher.name]
jt.jobEnvironment = {
"LC_ALL": "en_US.UTF-8",
"LANG": "en_US.UTF-8",
}
jt.nativeSpecification = get_cli_args(
job_name=jt.jobName,
account=resources["account"],
partition=resources["partition"],
peak_memory=resources["memory"],
max_runtime=resources["time_limit"],
num_threads=resources["cpus"],
jt.nativeSpecification = (
f"--job-name={jt.jobName} "
f"--account={account} "
f"--partition={partition} "
f"--mem-per-cpu={memory_per_node * 1024} "
f"--time={max_runtime}:00:00 "
f"--nodes={num_workers + 1} "
f"--cpus-per-task={cpus_per_task} "
"--ntasks-per-node=1"
)
job_id = session.runJob(jt)
logger.info(
f"Launching slurm job for step '{step_name}'\n"
f"Job submitted with jobid '{job_id}'\n"
f"Output log: {str(results_dir / f'{job_id}.o*')}\n"
f"Error log: {str(results_dir / f'{job_id}.e*')}"
f"Submitting slurm job for launching the Spark cluster: '{jt.jobName}'\n"
f"Job submitted with jobid '{job_id}' to execute script '{launcher.name}'\n"
# f"Output log: {str(results_dir / f'{job_id}.o*')}\n"
# f"Error log: {str(results_dir / f'{job_id}.e*')}"
)
job_status = session.wait(job_id, session.TIMEOUT_WAIT_FOREVER)
# TODO: clean up if job failed?
logger.info(f"Job {job_id} finished with status '{job_status}'")
session.deleteJobTemplate(jt)
session.exit()
49 changes: 33 additions & 16 deletions src/linker/utilities/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,37 @@
from pathlib import Path
from typing import TextIO

from linker.utilities.slurm_utils import submit_spark_cluster_job
from linker.utilities.slurm_utils import get_slurm_drmaa, submit_spark_cluster_job

CONDA_PATH = (
"/ihme/homes/mkappel/miniconda3/condabin/conda " # must be accessible within container
)
CONDA_ENV = "pvs_like_case_study_spark_node"
SINGULARITY_IMG = "docker://apache/spark@sha256:a1dd2487a97fb5e35c5a5b409e830b501a92919029c62f9a559b13c4f5c50f63"


def build_cluster():
def build_cluster(environment_file: Path) -> str:
"""Builds a Spark cluster.
Returns:
spark_master_url: Spark master URL.
"""
drmaa = get_slurm_drmaa()
session = drmaa.Session()

spark_master_url = ""

# TODO: Read environment file

# call build_launch_script
launcher = build_cluster_launch_script()

# TODO: update these for configuration
# submit job
submit_spark_cluster_job(
session=session,
launcher=launcher,
account="proj_simscience",
partition="all.q",
memory_per_node=8,
max_runtime=1,
num_workers=2,
cpus_per_task=5,
)

# grep log for spark master url or is there a better approach?

Expand All @@ -41,12 +51,13 @@ def build_cluster_launch_script() -> TextIO:
delete=False,
)

output_dir = str(worker_settings_file.resolve().parent)
# output_dir = str(worker_settings_file.resolve().parent)

# TODO: handle .cluster.ihme.washington.edu
launcher.write(
f"""
#!/bin/bash
# start_spark_slurm.sh generated by PRL ecosystem tool linker
#start_spark_slurm.sh automatically generated by linker
unset SPARK_HOME
CONDA_PATH=/opt/conda/condabin/conda # must be accessible within container
Expand All @@ -59,7 +70,6 @@ def build_cluster_launch_script() -> TextIO:
export SPARK_MASTER_PORT=28508
export SPARK_MASTER_WEBUI_PORT=28509
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
# shellcheck disable=SC2004
export SPARK_DAEMON_MEMORY=$(( $SLURM_MEM_PER_CPU * $SLURM_CPUS_PER_TASK / 2 ))m
export SPARK_MEM=$SPARK_DAEMON_MEMORY
Expand All @@ -80,18 +90,25 @@ def build_cluster_launch_script() -> TextIO:
MASTER_NODE=$( scontrol show hostname "$SLURM_NODELIST "| head -n 1 )
mkdir -p "/tmp/pvs_like_case_study_spark_local_$USER"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp $SINGULARITY_IMG $CONDA_PATH run --no-capture-output -n $CONDA_ENV "$SPARK_ROOT/bin/spark-class" org.apache.spark.deploy.master.Master --host "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp $SINGULARITY_IMG \
$CONDA_PATH run --no-capture-output -n $CONDA_ENV "$SPARK_ROOT/bin/spark-class" \
org.apache.spark.deploy.master.Master --host "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" \
--webui-port "$SPARK_MASTER_WEBUI_PORT"
else
# TODO: This step assumes that SLURM_PROCID=0 corresponds to the first node in SLURM_NODELIST. Is this reasonable?
# TODO: This step assumes that SLURM_PROCID=0 corresponds to the first node in SLURM_NODELIST.
# Is this reasonable?
MASTER_NODE=spark://$( scontrol show hostname "$SLURM_NODELIST" | head -n 1 ):"$SPARK_MASTER_PORT"
mkdir -p "/tmp/spark_cluster_$USER"
singularity exec -B /mnt:/mnt,"/tmp/spark_cluster_$USER":/tmp "$SINGULARITY_IMG" "$CONDA_PATH" run --no-capture-output -n "$CONDA_ENV" "$SPARK_ROOT/bin/spark-class" org.apache.spark.deploy.worker.Worker "$MASTER_NODE"
mkdir -p "/tmp/pvs_like_case_study_spark_local_$USER"
singularity exec -B /mnt:/mnt,"/tmp/pvs_like_case_study_spark_local_$USER":/tmp "$SINGULARITY_IMG" \
"$CONDA_PATH" run --no-capture-output -n "$CONDA_ENV" "$SPARK_ROOT/bin/spark-class" \
org.apache.spark.deploy.worker.Worker "$MASTER_NODE"
fi
fi
"""
)
launcher.close()

atexit.register(lambda: os.remove(launcher.name))
# TODO: handle cleanup, but not here!!
# atexit.register(lambda: os.remove(launcher.name))
return launcher

0 comments on commit b994276

Please sign in to comment.