From 2c309aa6bbae71ee97e6c8156f5406d6a2c16af9 Mon Sep 17 00:00:00 2001 From: Jast Date: Fri, 17 Jan 2025 02:45:20 +0800 Subject: [PATCH] [Feature] Support multi-table sink (#265) --- .../service/impl/JobInstanceServiceImpl.java | 8 +++- .../app/service/impl/JobTaskServiceImpl.java | 40 +++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 8d0b79579..6b1e97af3 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -177,7 +177,11 @@ public String generateJobConfig( Map> sinkMap = new LinkedHashMap<>(); Map inputLines = lines.stream() - .collect(Collectors.toMap(JobLine::getInputPluginId, Function.identity())); + .collect( + Collectors.toMap( + JobLine::getInputPluginId, + Function.identity(), + (existing, replacement) -> existing)); Map targetLines = lines.stream() .collect(Collectors.toMap(JobLine::getTargetPluginId, Function.identity())); @@ -484,7 +488,7 @@ private String getConnectorConfig(Map> connectorMap) { private Config addTableName(String tableName, JobLine jobLine, Config config) { return config.withValue( - tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getId())); + tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getInputPluginId())); } private Config filterEmptyValue(Config config) { diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java index c5cef3173..26ac6bcb5 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java @@ -289,15 +289,25 @@ private JobTaskCheckRes checkPluginSchemaIntegrity(JobTaskInfo taskInfo) throws Map pluginMap = taskInfo.getPlugins().stream() .collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity())); - Map edgeMap = + Map> edgeMap = taskInfo.getEdges().stream() - .collect(Collectors.toMap(Edge::getInputPluginId, Edge::getTargetPluginId)); + .collect( + Collectors.groupingBy( + Edge::getInputPluginId, + Collectors.mapping( + Edge::getTargetPluginId, Collectors.toList()))); for (PluginConfig config : source) { - PluginConfig nextConfig = pluginMap.get(edgeMap.get(config.getPluginId())); - JobTaskCheckRes res = checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap); - if (res != null) { - return res; + List nextConfigs = edgeMap.get(config.getPluginId()); + if (nextConfigs != null) { + for (String nextConfigId : nextConfigs) { + PluginConfig nextConfig = pluginMap.get(nextConfigId); + JobTaskCheckRes res = + checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap); + if (res != null) { + return res; + } + } } } return null; @@ -307,7 +317,7 @@ private JobTaskCheckRes checkNextTaskSchema( PluginConfig config, PluginConfig nextConfig, Map pluginMap, - Map edgeMap) + Map> edgeMap) throws IOException { Map options = nextConfig.getTransformOptions(); if (options != null && !options.isEmpty()) { @@ -384,12 +394,16 @@ private JobTaskCheckRes checkNextTaskSchema( } } } - if (edgeMap.containsKey(nextConfig.getPluginId())) { - return checkNextTaskSchema( - nextConfig, - pluginMap.get(edgeMap.get(nextConfig.getPluginId())), - pluginMap, - edgeMap); + List nextConfigIds = edgeMap.get(nextConfig.getPluginId()); + if (nextConfigIds != null) { + for (String nextConfigId : nextConfigIds) { + JobTaskCheckRes res = + checkNextTaskSchema( + nextConfig, pluginMap.get(nextConfigId), pluginMap, edgeMap); + if (res != null) { + return res; + } + } } return null; }