Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
fanxishu authored Jan 17, 2025
2 parents 4ee2810 + 2c309aa commit df190a1
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ public IPage<SeaTunnelJobInstanceDto> queryJobInstanceListPaging(
Date endTime,
String jobDefineName,
JobMode jobMode) {
IPage<SeaTunnelJobInstanceDto> jobInstanceIPage =
jobInstanceMapper.queryJobInstanceListPaging(
page, startTime, endTime, jobDefineName, jobMode);
return jobInstanceIPage;
return jobInstanceMapper.queryJobInstanceListPaging(
page, startTime, endTime, jobDefineName, jobMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class SeaTunnelJobInstanceDto extends JobInstance {
private long writeRowCount;

private Long runningTime;

private String username;
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ public String generateJobConfig(
Map<String, List<Config>> sinkMap = new LinkedHashMap<>();
Map<String, JobLine> inputLines =
lines.stream()
.collect(Collectors.toMap(JobLine::getInputPluginId, Function.identity()));
.collect(
Collectors.toMap(
JobLine::getInputPluginId,
Function.identity(),
(existing, replacement) -> existing));
Map<String, JobLine> targetLines =
lines.stream()
.collect(Collectors.toMap(JobLine::getTargetPluginId, Function.identity()));
Expand Down Expand Up @@ -484,7 +488,7 @@ private String getConnectorConfig(Map<String, List<Config>> connectorMap) {

private Config addTableName(String tableName, JobLine jobLine, Config config) {
return config.withValue(
tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getId()));
tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getInputPluginId()));
}

private Config filterEmptyValue(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,25 @@ private JobTaskCheckRes checkPluginSchemaIntegrity(JobTaskInfo taskInfo) throws
Map<String, PluginConfig> pluginMap =
taskInfo.getPlugins().stream()
.collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity()));
Map<String, String> edgeMap =
Map<String, List<String>> edgeMap =
taskInfo.getEdges().stream()
.collect(Collectors.toMap(Edge::getInputPluginId, Edge::getTargetPluginId));
.collect(
Collectors.groupingBy(
Edge::getInputPluginId,
Collectors.mapping(
Edge::getTargetPluginId, Collectors.toList())));

for (PluginConfig config : source) {
PluginConfig nextConfig = pluginMap.get(edgeMap.get(config.getPluginId()));
JobTaskCheckRes res = checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap);
if (res != null) {
return res;
List<String> nextConfigs = edgeMap.get(config.getPluginId());
if (nextConfigs != null) {
for (String nextConfigId : nextConfigs) {
PluginConfig nextConfig = pluginMap.get(nextConfigId);
JobTaskCheckRes res =
checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap);
if (res != null) {
return res;
}
}
}
}
return null;
Expand All @@ -307,7 +317,7 @@ private JobTaskCheckRes checkNextTaskSchema(
PluginConfig config,
PluginConfig nextConfig,
Map<String, PluginConfig> pluginMap,
Map<String, String> edgeMap)
Map<String, List<String>> edgeMap)
throws IOException {
Map<String, Object> options = nextConfig.getTransformOptions();
if (options != null && !options.isEmpty()) {
Expand Down Expand Up @@ -384,12 +394,16 @@ private JobTaskCheckRes checkNextTaskSchema(
}
}
}
if (edgeMap.containsKey(nextConfig.getPluginId())) {
return checkNextTaskSchema(
nextConfig,
pluginMap.get(edgeMap.get(nextConfig.getPluginId())),
pluginMap,
edgeMap);
List<String> nextConfigIds = edgeMap.get(nextConfig.getPluginId());
if (nextConfigIds != null) {
for (String nextConfigId : nextConfigIds) {
JobTaskCheckRes res =
checkNextTaskSchema(
nextConfig, pluginMap.get(nextConfigId), pluginMap, edgeMap);
if (res != null) {
return res;
}
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.seatunnel.app.common.Status;
import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.entity.JobDefinition;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
Expand Down Expand Up @@ -93,7 +92,8 @@ public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(
if (CollectionUtils.isEmpty(records)) {
return result;
}
populateExecutionMetricsData(userId, jobMode, records);
addRunningTimeToResult(records);
jobPipelineSummaryMetrics(records, jobMode, userId);
pageInfo.setTotal((int) jobInstanceIPage.getTotal());
pageInfo.setTotalList(records);
result.setData(pageInfo);
Expand All @@ -102,7 +102,6 @@ public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(

private void populateExecutionMetricsData(
Integer userId, JobMode jobMode, List<SeaTunnelJobInstanceDto> records) {
addJobDefineNameToResult(records);
addRunningTimeToResult(records);
jobPipelineSummaryMetrics(records, jobMode, userId);
}
Expand All @@ -126,16 +125,6 @@ private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records) {
}
}

private void addJobDefineNameToResult(List<SeaTunnelJobInstanceDto> records) {
for (SeaTunnelJobInstanceDto jobInstanceDto : records) {
JobDefinition jobDefinition =
jobDefinitionService.getJobDefinitionByJobId(jobInstanceDto.getJobDefineId());
if (jobDefinition != null) {
jobInstanceDto.setJobDefineName(jobDefinition.getName());
}
}
}

public Date dateConverter(String time) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,28 @@
</sql>

<select id="queryJobInstanceListPaging" resultType="org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto">
SELECT *
FROM t_st_job_instance
SELECT ji.*, jd.name AS jobDefineName, cu.username AS username
FROM t_st_job_instance ji
LEFT JOIN t_st_job_definition jd ON ji.job_define_id = jd.id
LEFT JOIN user cu ON ji.create_user_id = cu.id
<where>
<if test="startTime != null">
AND create_time <![CDATA[ >=]]> #{startTime}
AND ji.create_time <![CDATA[ >=]]> #{startTime}
</if>
<if test="endTime == null">
AND (end_time is null or end_time <![CDATA[ <=]]> #{endTime})
AND (ji.end_time is null or ji.end_time <![CDATA[ <=]]> #{endTime})
</if>
<if test="endTime != null">
AND (end_time <![CDATA[ <=]]> #{endTime} or end_time is null)
AND (ji.end_time <![CDATA[ <=]]> #{endTime} or ji.end_time is null)
</if>
<if test="jobDefineName != null">
AND job_define_id in (select x.id from t_st_job_definition x where x.name LIKE concat('%', #{jobDefineName}, '%'))
AND jd.name LIKE concat('%', #{jobDefineName}, '%')
</if>
<if test="jobMode != null">
AND job_type = #{jobMode}
AND ji.job_type = #{jobMode}
</if>
</where>
ORDER BY create_time DESC
ORDER BY ji.create_time DESC
</select>
<select id="getJobExecutionStatus" resultType="org.apache.seatunnel.app.dal.entity.JobInstance">
SELECT `job_status`, `error_message`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export function useSyncTask(syncTaskType = 'BATCH') {
},
{
title: t('project.synchronization_instance.execute_user'),
key: 'createUserId',
key: 'username',
...COLUMN_WIDTH_CONFIG['state']
},
{
Expand Down

0 comments on commit df190a1

Please sign in to comment.