Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
fanxishu authored Jan 10, 2025
2 parents 4af124c + 7084272 commit f474d26
Show file tree
Hide file tree
Showing 39 changed files with 667 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ public List<String> getTables(
String database,
Map<String, String> option) {
databaseCheck(database);

try (EsRestClient client =
EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
return client.listIndex();
return client.listIndex(option.get("filterName"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
Expand Down Expand Up @@ -252,27 +253,7 @@ public void close() {
}

public List<String> listIndex() {
String endpoint = "/_cat/indices?format=json";
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ResponseException("GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
return JsonUtils.toList(entity, Map.class).stream()
.map(map -> map.get("index").toString())
.collect(Collectors.toList());
} else {
throw new ResponseException(
String.format(
"GET %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
}
return this.listIndex(null);
}

public void dropIndex(String tableName) {
Expand Down Expand Up @@ -365,4 +346,41 @@ private static Map<String, String> getFieldTypeMappingFromProperties(JsonNode pr
}
return mapping;
}

public List<String> listIndex(String filterName) {
String endpoint = "/_cat/indices?format=json";
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ResponseException("GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<String> indices =
JsonUtils.toList(entity, Map.class).stream()
.map(map -> map.get("index").toString())
.collect(Collectors.toList());

if (StringUtils.isNotEmpty(filterName)) {
indices =
indices.stream()
.filter(
index ->
index.toLowerCase()
.contains(filterName.toLowerCase()))
.collect(Collectors.toList());
}

return indices;
} else {
throw new ResponseException(
String.format(
"GET %s response status code=%d",
endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new ResponseException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.IJobTaskService;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
Expand Down Expand Up @@ -60,7 +61,11 @@ Result<Long> createJobDefinition(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@RequestBody JobReq jobReq)
throws CodeGenerateUtils.CodeGenerateException {
return Result.success(jobService.createJob(userId, jobReq));
if (jobService.getJob(jobReq.getName()).isEmpty()) {
return Result.success(jobService.createJob(userId, jobReq));
} else {
return Result.failure(SeatunnelErrorEnum.TASK_NAME_ALREADY_EXISTS);
}
}

@GetMapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.app.dal.entity.JobDefinition;
import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;

import lombok.NonNull;

Expand All @@ -32,7 +33,8 @@ public interface IJobDefinitionDao {

void updateJob(JobDefinition jobDefinition);

PageInfo<JobDefinition> getJob(String name, Integer pageNo, Integer pageSize, String jobMode);
PageInfo<JobDefinitionRes> getJob(
String name, Integer pageNo, Integer pageSize, String jobMode);

List<JobDefinition> getJobList(@NonNull String name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.dal.entity.JobDefinition;
import org.apache.seatunnel.app.dal.mapper.JobMapper;
import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -56,9 +57,9 @@ public void updateJob(JobDefinition jobDefinition) {
}

@Override
public PageInfo<JobDefinition> getJob(
public PageInfo<JobDefinitionRes> getJob(
String searchName, Integer pageNo, Integer pageSize, String jobMode) {
IPage<JobDefinition> jobDefinitionIPage;
IPage<JobDefinitionRes> jobDefinitionIPage;
if (StringUtils.isEmpty(jobMode)) {
jobDefinitionIPage =
jobMapper.queryJobListPaging(new Page<>(pageNo, pageSize), searchName);
Expand All @@ -67,7 +68,7 @@ public PageInfo<JobDefinition> getJob(
jobMapper.queryJobListPagingWithJobMode(
new Page<>(pageNo, pageSize), searchName, jobMode);
}
PageInfo<JobDefinition> jobs = new PageInfo<>();
PageInfo<JobDefinitionRes> jobs = new PageInfo<>();
jobs.setData(jobDefinitionIPage.getRecords());
jobs.setPageSize(pageSize);
jobs.setPageNo(pageNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.app.dal.mapper;

import org.apache.seatunnel.app.dal.entity.JobDefinition;
import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;

import org.apache.ibatis.annotations.Param;

Expand All @@ -28,10 +29,10 @@

public interface JobMapper extends BaseMapper<JobDefinition> {

IPage<JobDefinition> queryJobListPaging(
IPage<JobDefinitionRes> queryJobListPaging(
IPage<JobDefinition> page, @Param("searchName") String searchName);

IPage<JobDefinition> queryJobListPagingWithJobMode(
IPage<JobDefinitionRes> queryJobListPagingWithJobMode(
IPage<JobDefinition> page,
@Param("searchName") String searchName,
@Param("jobMode") String jobMode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.domain.request.job.transform;

import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;

@Data
@EqualsAndHashCode(callSuper = true)
public class JsonPath extends TransformOption {

private List<JsonPathColumn> columns;
}

@Data
class JsonPathColumn {
private String src_field;
private String path;
private String destField;
private String destType;
private String columnErrorHandleWay;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.domain.request.job.transform;

import lombok.Data;

@Data
public class JsonPathTransformOptions implements TransformOptions {

private JsonPath jsonPath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public enum Transform {
FIELDMAPPER,
FILTERROWKIND,
SPLIT,
SQL
SQL,
JSONPATH
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public List<ConnectorInfo> listTransformsForJob(Long jobId) {
.toUpperCase());

if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {

return connectorCache.getTransform().stream()
.filter(
connectorInfo -> {
Expand All @@ -133,7 +134,8 @@ public List<ConnectorInfo> listTransformsForJob(Long jobId) {
|| pluginName.equals("Replace")
|| pluginName.equals("Copy")
|| pluginName.equals("MultiFieldSplit")
|| pluginName.equals("Sql");
|| pluginName.equals("Sql")
|| pluginName.equals("JsonPath");
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,36 +119,7 @@ public PageInfo<JobDefinitionRes> getJob(
SeatunnelErrorEnum.ILLEGAL_STATE, "Unsupported JobMode");
}
}
PageInfo<JobDefinition> jobDefinitionPageInfo =
jobDefinitionDao.getJob(searchName, pageNo, pageSize, jobMode);
List<Integer> userIds =
jobDefinitionPageInfo.getData().stream()
.map(JobDefinition::getCreateUserId)
.collect(Collectors.toList());
userIds.addAll(
jobDefinitionPageInfo.getData().stream()
.map(JobDefinition::getUpdateUserId)
.collect(Collectors.toList()));
List<JobDefinitionRes> jobDefinitionResList = new ArrayList<>();
for (int i = 0; i < jobDefinitionPageInfo.getData().size(); i++) {
JobDefinition jobDefinition = jobDefinitionPageInfo.getData().get(i);
JobDefinitionRes jobDefinitionRes = new JobDefinitionRes();
jobDefinitionRes.setId(jobDefinition.getId());
jobDefinitionRes.setName(jobDefinition.getName());
jobDefinitionRes.setDescription(jobDefinition.getDescription());
jobDefinitionRes.setJobType(jobDefinition.getJobType());
jobDefinitionRes.setCreateUserId(jobDefinition.getCreateUserId());
jobDefinitionRes.setUpdateUserId(jobDefinitionRes.getUpdateUserId());
jobDefinitionRes.setCreateTime(jobDefinition.getCreateTime());
jobDefinitionRes.setUpdateTime(jobDefinition.getUpdateTime());
jobDefinitionResList.add(jobDefinitionRes);
}
PageInfo<JobDefinitionRes> pageInfo = new PageInfo<>();
pageInfo.setPageNo(jobDefinitionPageInfo.getPageNo());
pageInfo.setPageSize(jobDefinitionPageInfo.getPageSize());
pageInfo.setTotalCount(jobDefinitionPageInfo.getTotalCount());
pageInfo.setData(jobDefinitionResList);
return pageInfo;
return jobDefinitionDao.getJob(searchName, pageNo, pageSize, jobMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
public class JobServiceImpl implements IJobService {
Expand All @@ -61,14 +64,31 @@ public long createJob(int userId, JobCreateReq jobCreateRequest)
throws JsonProcessingException {
JobReq jobDefinition = getJobDefinition(jobCreateRequest.getJobConfig());
long jobId = jobService.createJob(userId, jobDefinition);
createTasks(userId, jobCreateRequest, jobId);
return jobId;
}

private void createTasks(int userId, JobCreateReq jobCreateRequest, long jobId)
throws JsonProcessingException {
List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
Set<String> edgeIds =
jobCreateRequest.getJobDAG().getEdges().stream()
.flatMap(
edge ->
Stream.of(
edge.getInputPluginId(), edge.getTargetPluginId()))
.collect(Collectors.toSet());
Map<String, String> pluginNameVsPluginId = new HashMap<>();
if (pluginConfig != null) {
for (PluginConfig config : pluginConfig) {
String pluginId = String.valueOf(CodeGenerateUtils.getInstance().genCode());
config.setPluginId(pluginId);
String pluginIdKey =
edgeIds.contains(config.getName())
? config.getName()
: config.getPluginId();
String newPluginId = String.valueOf(CodeGenerateUtils.getInstance().genCode());
config.setPluginId(newPluginId);
jobTaskService.saveSingleTask(jobId, config);
pluginNameVsPluginId.put(config.getName(), pluginId);
pluginNameVsPluginId.put(pluginIdKey, newPluginId);
}
}
jobConfigService.updateJobConfig(userId, jobId, jobCreateRequest.getJobConfig());
Expand All @@ -80,7 +100,6 @@ public long createJob(int userId, JobCreateReq jobCreateRequest)
edge.setTargetPluginId(pluginNameVsPluginId.get(edge.getTargetPluginId()));
}
jobTaskService.saveJobDAG(jobId, jobDAG);
return jobId;
}

private JobReq getJobDefinition(JobConfig jobConfig) {
Expand Down Expand Up @@ -118,14 +137,8 @@ private JobReq getJobDefinition(JobConfig jobConfig) {
@Override
public void updateJob(Integer userId, long jobVersionId, JobCreateReq jobCreateReq)
throws JsonProcessingException {
jobConfigService.updateJobConfig(userId, jobVersionId, jobCreateReq.getJobConfig());
List<PluginConfig> pluginConfigs = jobCreateReq.getPluginConfigs();
if (pluginConfigs != null) {
for (PluginConfig pluginConfig : pluginConfigs) {
jobTaskService.saveSingleTask(jobVersionId, pluginConfig);
}
}
jobTaskService.saveJobDAG(jobVersionId, jobCreateReq.getJobDAG());
jobTaskService.deleteTaskByVersionId(jobVersionId);
createTasks(userId, jobCreateReq, jobVersionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ private JobTaskCheckRes checkNextTaskSchema(
Collections.singletonList(sqlTransformOptions.getSql()));
}
break;
case JSONPATH:
break;
case FILTERROWKIND:
case REPLACE:
default:
Expand Down
Loading

0 comments on commit f474d26

Please sign in to comment.