Skip to content

Commit

Permalink
[Feature] Support multi-table sink (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored Jan 16, 2025
1 parent 40d894f commit 2c309aa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ public String generateJobConfig(
Map<String, List<Config>> sinkMap = new LinkedHashMap<>();
Map<String, JobLine> inputLines =
lines.stream()
.collect(Collectors.toMap(JobLine::getInputPluginId, Function.identity()));
.collect(
Collectors.toMap(
JobLine::getInputPluginId,
Function.identity(),
(existing, replacement) -> existing));
Map<String, JobLine> targetLines =
lines.stream()
.collect(Collectors.toMap(JobLine::getTargetPluginId, Function.identity()));
Expand Down Expand Up @@ -484,7 +488,7 @@ private String getConnectorConfig(Map<String, List<Config>> connectorMap) {

private Config addTableName(String tableName, JobLine jobLine, Config config) {
return config.withValue(
tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getId()));
tableName, ConfigValueFactory.fromAnyRef("Table" + jobLine.getInputPluginId()));
}

private Config filterEmptyValue(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,25 @@ private JobTaskCheckRes checkPluginSchemaIntegrity(JobTaskInfo taskInfo) throws
Map<String, PluginConfig> pluginMap =
taskInfo.getPlugins().stream()
.collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity()));
Map<String, String> edgeMap =
Map<String, List<String>> edgeMap =
taskInfo.getEdges().stream()
.collect(Collectors.toMap(Edge::getInputPluginId, Edge::getTargetPluginId));
.collect(
Collectors.groupingBy(
Edge::getInputPluginId,
Collectors.mapping(
Edge::getTargetPluginId, Collectors.toList())));

for (PluginConfig config : source) {
PluginConfig nextConfig = pluginMap.get(edgeMap.get(config.getPluginId()));
JobTaskCheckRes res = checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap);
if (res != null) {
return res;
List<String> nextConfigs = edgeMap.get(config.getPluginId());
if (nextConfigs != null) {
for (String nextConfigId : nextConfigs) {
PluginConfig nextConfig = pluginMap.get(nextConfigId);
JobTaskCheckRes res =
checkNextTaskSchema(config, nextConfig, pluginMap, edgeMap);
if (res != null) {
return res;
}
}
}
}
return null;
Expand All @@ -307,7 +317,7 @@ private JobTaskCheckRes checkNextTaskSchema(
PluginConfig config,
PluginConfig nextConfig,
Map<String, PluginConfig> pluginMap,
Map<String, String> edgeMap)
Map<String, List<String>> edgeMap)
throws IOException {
Map<String, Object> options = nextConfig.getTransformOptions();
if (options != null && !options.isEmpty()) {
Expand Down Expand Up @@ -384,12 +394,16 @@ private JobTaskCheckRes checkNextTaskSchema(
}
}
}
if (edgeMap.containsKey(nextConfig.getPluginId())) {
return checkNextTaskSchema(
nextConfig,
pluginMap.get(edgeMap.get(nextConfig.getPluginId())),
pluginMap,
edgeMap);
List<String> nextConfigIds = edgeMap.get(nextConfig.getPluginId());
if (nextConfigIds != null) {
for (String nextConfigId : nextConfigIds) {
JobTaskCheckRes res =
checkNextTaskSchema(
nextConfig, pluginMap.get(nextConfigId), pluginMap, edgeMap);
if (res != null) {
return res;
}
}
}
return null;
}
Expand Down

0 comments on commit 2c309aa

Please sign in to comment.