Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
timmow committed Jan 13, 2025
1 parent 5abf50c commit c9de580
Showing 1 changed file with 154 additions and 0 deletions.
154 changes: 154 additions & 0 deletions tests/cli/test_cmds_spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,160 @@ class TestConfigureAndRunDockerContainer:
"fake_dir",
)

# @pytest.mark.parametrize("foo", [(True), (False)])
# @pytest.mark.parametrize(
# "test_input,expected", [("3+5", 8), ("2+4", 6), ("6*9", 42)]
# )
# def test_answer(test_input, expected, foo):
# print("test_input: ", test_input)
# print("expected: ", expected)
# print("foo: ", foo)
# assert False

# @pytest.mark.parametrize(
# ["test_input", "expected"], [("3+5", 8), ("2+4", 6), ("6*9", 42)]
# )
# def test_answer(test_input, expected):
# assert True
@pytest.mark.parametrize(
["cluster_manager", "spark_args_volumes", "expected_volumes"],
[
(
spark_run.CLUSTER_MANAGER_K8S,
{
"spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly": "true",
"spark.kubernetes.executor.volumes.hostPath.0.mount.path": "/nail/bulkdata",
"spark.kubernetes.executor.volumes.hostPath.0.options.path": "/k8s/volume0",
"spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly": "false",
"spark.kubernetes.executor.volumes.hostPath.1.mount.path": "/k8s/volume1",
"spark.kubernetes.executor.volumes.hostPath.1.options.path": "/k8s/volume1",
},
[
"/k8s/volume0:/k8s/volume0:ro",
"/k8s/volume1:/k8s/volume1:rw",
"/nail/bulkdata:/nail/bulkdata:ro",
],
),
(
spark_run.CLUSTER_MANAGER_LOCAL,
{
"spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly": "true",
"spark.kubernetes.executor.volumes.hostPath.0.mount.path": "/k8s/volume0",
"spark.kubernetes.executor.volumes.hostPath.0.options.path": "/k8s/volume0",
"spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly": "false",
"spark.kubernetes.executor.volumes.hostPath.1.mount.path": "/k8s/volume1",
"spark.kubernetes.executor.volumes.hostPath.1.options.path": "/k8s/volume1",
},
[
"/k8s/volume0:/k8s/volume0:ro",
"/k8s/volume1:/k8s/volume1:rw",
"/nail/bulkdata:/nail/bulkdata:ro",
],
),
],
)
def test_configure_and_run_docker_container_with_uses_bulkdata_and_bulkdata_volume(
self,
mock_create_spark_config_str,
mock_get_docker_cmd,
mock_get_webui_url,
mock_run_docker_container,
mock_get_username,
cluster_manager,
spark_args_volumes,
expected_volumes,
):
mock_get_username.return_value = "fake_user"
spark_conf = {
"spark.app.name": "fake_app",
"spark.ui.port": "1234",
**spark_args_volumes,
}
mock_create_spark_config_str.return_value = "testing spark opts string"
mock_run_docker_container.return_value = 0

args = mock.MagicMock()
args.aws_region = "fake_region"
args.cluster = "fake_cluster"
args.cmd = "pyspark"
args.work_dir = "/fake_dir:/spark_driver"
args.dry_run = True
args.mrjob = False
args.nvidia = False
args.enable_compact_bin_packing = False
args.cluster_manager = cluster_manager
args.docker_cpu_limit = False
args.docker_memory_limit = False
args.docker_shm_size = False
args.tronfig = None
args.job_id = None
args.use_service_auth_token = False
args.uses_bulkdata = True

instance_config = InstanceConfig(
cluster="fake_cluster",
instance="fake_instance",
service="fake_service",
config_dict=InstanceConfigDict(
{
"uses_bulkdata": True,
"extra_volumes": [
{"hostPath": "/h1", "containerPath": "/c1", "mode": "RO"}
],
}
),
branch_dict=BranchDictV2({"docker_image": "fake_service:fake_sha"}),
)
with mock.patch.object(
instance_config, "get_env_dictionary", return_value={"env1": "val1"}
):
retcode = configure_and_run_docker_container(
args=args,
docker_img="fake-registry/fake-service",
instance_config=instance_config,
system_paasta_config=self.system_paasta_config,
aws_creds=("id", "secret", "token"),
spark_conf=spark_conf,
cluster_manager=cluster_manager,
pod_template_path="unique-run",
)
assert retcode == 0
mock_create_spark_config_str.assert_called_once_with(
spark_config_dict=spark_conf,
is_mrjob=args.mrjob,
)
mock_run_docker_container.assert_called_once_with(
container_name="fake_app",
volumes=(
expected_volumes
+ [
"/fake_dir:/spark_driver:rw",
"/nail/home:/nail/home:rw",
"unique-run:unique-run:rw",
"/etc/kubernetes/spark.conf:/etc/kubernetes/spark.conf:ro",
]
),
environment={
"env1": "val1",
"AWS_ACCESS_KEY_ID": "id",
"AWS_SECRET_ACCESS_KEY": "secret",
"AWS_SESSION_TOKEN": "token",
"AWS_DEFAULT_REGION": "fake_region",
"SPARK_OPTS": "testing spark opts string",
"SPARK_USER": "root",
"PAASTA_INSTANCE_TYPE": "spark",
"PAASTA_LAUNCHED_BY": mock.ANY,
"KUBECONFIG": "/etc/kubernetes/spark.conf",
},
docker_img="fake-registry/fake-service",
docker_cmd=mock_get_docker_cmd.return_value,
dry_run=True,
nvidia=False,
docker_memory_limit="2g",
docker_shm_size=DEFAULT_DOCKER_SHM_SIZE,
docker_cpu_limit="1",
)

@pytest.mark.parametrize("uses_bulkdata", [True, False])
@pytest.mark.parametrize(
["cluster_manager", "spark_args_volumes", "expected_volumes"],
Expand Down

0 comments on commit c9de580

Please sign in to comment.