Skip to content

Commit

Permalink
[Bug][Seatunnel-web] Job update API fails when tasks are added or rem…
Browse files Browse the repository at this point in the history
…oved (#249)
  • Loading branch information
arshadmohammad authored Dec 31, 2024
1 parent 937b3d6 commit cf1f6c9
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
public class JobServiceImpl implements IJobService {
Expand All @@ -61,14 +64,31 @@ public long createJob(int userId, JobCreateReq jobCreateRequest)
throws JsonProcessingException {
JobReq jobDefinition = getJobDefinition(jobCreateRequest.getJobConfig());
long jobId = jobService.createJob(userId, jobDefinition);
createTasks(userId, jobCreateRequest, jobId);
return jobId;
}

private void createTasks(int userId, JobCreateReq jobCreateRequest, long jobId)
throws JsonProcessingException {
List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
Set<String> edgeIds =
jobCreateRequest.getJobDAG().getEdges().stream()
.flatMap(
edge ->
Stream.of(
edge.getInputPluginId(), edge.getTargetPluginId()))
.collect(Collectors.toSet());
Map<String, String> pluginNameVsPluginId = new HashMap<>();
if (pluginConfig != null) {
for (PluginConfig config : pluginConfig) {
String pluginId = String.valueOf(CodeGenerateUtils.getInstance().genCode());
config.setPluginId(pluginId);
String pluginIdKey =
edgeIds.contains(config.getName())
? config.getName()
: config.getPluginId();
String newPluginId = String.valueOf(CodeGenerateUtils.getInstance().genCode());
config.setPluginId(newPluginId);
jobTaskService.saveSingleTask(jobId, config);
pluginNameVsPluginId.put(config.getName(), pluginId);
pluginNameVsPluginId.put(pluginIdKey, newPluginId);
}
}
jobConfigService.updateJobConfig(userId, jobId, jobCreateRequest.getJobConfig());
Expand All @@ -80,7 +100,6 @@ public long createJob(int userId, JobCreateReq jobCreateRequest)
edge.setTargetPluginId(pluginNameVsPluginId.get(edge.getTargetPluginId()));
}
jobTaskService.saveJobDAG(jobId, jobDAG);
return jobId;
}

private JobReq getJobDefinition(JobConfig jobConfig) {
Expand Down Expand Up @@ -118,14 +137,8 @@ private JobReq getJobDefinition(JobConfig jobConfig) {
@Override
public void updateJob(Integer userId, long jobVersionId, JobCreateReq jobCreateReq)
throws JsonProcessingException {
jobConfigService.updateJobConfig(userId, jobVersionId, jobCreateReq.getJobConfig());
List<PluginConfig> pluginConfigs = jobCreateReq.getPluginConfigs();
if (pluginConfigs != null) {
for (PluginConfig pluginConfig : pluginConfigs) {
jobTaskService.saveSingleTask(jobVersionId, pluginConfig);
}
}
jobTaskService.saveJobDAG(jobVersionId, jobCreateReq.getJobDAG());
jobTaskService.deleteTaskByVersionId(jobVersionId);
createTasks(userId, jobCreateReq, jobVersionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import org.apache.seatunnel.app.domain.request.connector.SceneMode;
import org.apache.seatunnel.app.domain.request.job.Edge;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
Expand All @@ -29,14 +31,19 @@
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.utils.JobTestingUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -187,6 +194,93 @@ public void testUpdateJob_ForValidAndInvalidScenarios() {
assertEquals(SeatunnelErrorEnum.ERROR_CONFIG.getCode(), jobUpdateResult.getCode());
}

@Test
public void testUpdateJob_AddNewTask() {
String jobName = "updateJob_single_api_add_task" + uniqueId;
JobCreateReq jobCreateReq =
JobTestingUtils.populateJobCreateReqFromFile(
jobName, "fake_source_create_4" + uniqueId, "console_create_4" + uniqueId);

Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
JobTestingUtils.executeJobAndVerifySuccess(job.getData());

Result<JobRes> getJobResponse = jobControllerWrapper.getJob(job.getData());
assertTrue(getJobResponse.isSuccess());
JobRes jobRes = getJobResponse.getData();

JobCreateReq jobUpdateReq = convertJobResToJobCreateReq(jobRes);
jobUpdateReq.getPluginConfigs().add(getCopyTransformPlugin());

List<Edge> edges = new ArrayList<>();
edges.add(new Edge("source-fake-source", "transform-replace"));
edges.add(new Edge("transform-replace", "transform-copy"));
edges.add(new Edge("transform-copy", "sink-console"));
JobDAG jobDAG = new JobDAG(edges);
jobUpdateReq.setJobDAG(jobDAG);
String jobName2 = "updateJob_single_api_add_task_up" + uniqueId;
jobUpdateReq.getJobConfig().setName(jobName2);
jobUpdateReq.getJobConfig().setDescription(jobName2 + " description");

Result<Void> jobUpdateResult = jobControllerWrapper.updateJob(job.getData(), jobUpdateReq);
assertTrue(jobUpdateResult.isSuccess());
JobTestingUtils.executeJobAndVerifySuccess(job.getData());
}

@Test
public void testUpdateJob_RemoveTask() {
String jobName = "updateJob_single_api_remove_task" + uniqueId;
JobCreateReq jobCreateReq =
JobTestingUtils.populateJobCreateReqFromFile(
jobName, "fake_source_create_5" + uniqueId, "console_create_5" + uniqueId);

Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
JobTestingUtils.executeJobAndVerifySuccess(job.getData());

Result<JobRes> getJobResponse = jobControllerWrapper.getJob(job.getData());
assertTrue(getJobResponse.isSuccess());
JobRes jobRes = getJobResponse.getData();

JobCreateReq jobUpdateReq = convertJobResToJobCreateReq(jobRes);
jobUpdateReq
.getPluginConfigs()
.removeIf(pluginConfig -> "transform-replace".equals(pluginConfig.getName()));

List<Edge> edges = new ArrayList<>();
edges.add(new Edge("source-fake-source", "sink-console"));
JobDAG jobDAG = new JobDAG(edges);
jobUpdateReq.setJobDAG(jobDAG);

Result<Void> jobUpdateResult = jobControllerWrapper.updateJob(job.getData(), jobUpdateReq);
assertTrue(jobUpdateResult.isSuccess());
JobTestingUtils.executeJobAndVerifySuccess(job.getData());
}

private PluginConfig getCopyTransformPlugin() {
String transPluginId = "copy" + System.currentTimeMillis();
Map<String, Object> transOptions = new HashMap<>();
transOptions.put(
"copyList",
Arrays.asList(
new HashMap<String, String>() {
{
put("sourceFieldName", "name");
put("targetFieldName", "name_copy");
}
}));
return PluginConfig.builder()
.pluginId(transPluginId)
.name("transform-copy")
.type(PluginType.TRANSFORM)
.connectorType("Copy")
.transformOptions(transOptions)
.outputSchema(null)
.sceneMode(SceneMode.SINGLE_TABLE)
.config("{\"query\":\"\"}")
.build();
}

private JobCreateReq convertJobResToJobCreateReq(JobRes jobRes) {
JobCreateReq jobCreateReq = new JobCreateReq();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
import org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
Expand All @@ -31,6 +32,7 @@
import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -39,20 +41,23 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class JobTestingUtils {
private static JobMetricsControllerWrapper jobMetricsControllerWrapper =
private static final JobMetricsControllerWrapper jobMetricsControllerWrapper =
new JobMetricsControllerWrapper();
private static JobConfigControllerWrapper jobConfigControllerWrapper =
private static final JobConfigControllerWrapper jobConfigControllerWrapper =
new JobConfigControllerWrapper();
private static JobDefinitionControllerWrapper jobDefinitionControllerWrapper =
private static final JobDefinitionControllerWrapper jobDefinitionControllerWrapper =
new JobDefinitionControllerWrapper();
private static JobTaskControllerWrapper jobTaskControllerWrapper =
private static final JobTaskControllerWrapper jobTaskControllerWrapper =
new JobTaskControllerWrapper();
private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper =
private static final SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper =
new SeatunnelDatasourceControllerWrapper();
private static JobControllerWrapper jobControllerWrapper = new JobControllerWrapper();
private static final JobControllerWrapper jobControllerWrapper = new JobControllerWrapper();
private static final JobExecutorControllerWrapper jobExecutorControllerWrapper =
new JobExecutorControllerWrapper();
private static final long TIMEOUT = 60; // 1 minute
private static final long INTERVAL = 2; // 1 second

Expand Down Expand Up @@ -208,4 +213,21 @@ public static Long createJob(JobCreateReq jobCreateReq) {
assertTrue(jobCreateResult.isSuccess());
return jobCreateResult.getData();
}

public static void executeJobAndVerifySuccess(long jobVersionId) {
executeJobAndVerifySuccess(jobVersionId, 5, 5);
}

public static void executeJobAndVerifySuccess(
long jobVersionId, long expectedReadRowCount, long expectedWriteRowCount) {
Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId);
assertTrue(result.isSuccess());
assertTrue(result.getData() > 0);
Result<List<JobPipelineDetailMetricsRes>> listResult =
JobTestingUtils.waitForJobCompletion(result.getData());
assertEquals(1, listResult.getData().size());
assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus());
assertEquals(expectedReadRowCount, listResult.getData().get(0).getReadRowCount());
assertEquals(expectedWriteRowCount, listResult.getData().get(0).getWriteRowCount());
}
}

0 comments on commit cf1f6c9

Please sign in to comment.