From cfc58703bbd85ea869cb2023e5a12f5699b45c14 Mon Sep 17 00:00:00 2001 From: XiaoJiang521 <131635688+XiaoJiang521@users.noreply.github.com> Date: Thu, 26 Oct 2023 17:51:31 +0800 Subject: [PATCH 1/4] [bugfix] virtual update datasource (#139) --- .../seatunnel/app/service/impl/VirtualTableServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java index 8a9c4f39b..50303c459 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java @@ -154,7 +154,7 @@ public Boolean updateVirtualTable( VirtualTable virtualTable = VirtualTable.builder() .id(Long.valueOf(tableId)) - .datasourceId(originalTable.getDatasourceId()) + .datasourceId(Long.parseLong(req.getDatasourceId())) .virtualDatabaseName(req.getDatabaseName()) .virtualTableName(req.getTableName()) .description(req.getDescription()) From 8d0c436e4fbcae90c5b3f3b5f7d88104766635b1 Mon Sep 17 00:00:00 2001 From: dbac <642826683@qq.com> Date: Sat, 28 Oct 2023 00:24:51 +0800 Subject: [PATCH 2/4] [Hotfix][APP] Fix job config file generation path error in windows (#140) * [fit][seatunnel-app] Modify configuration file generation path rules to adapt to windows and linux * [refactor][APP] Modify exception logs --- .../app/service/impl/JobExecutorServiceImpl.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index 145e25a9c..947519f1f 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; @@ -76,7 +77,8 @@ public Result jobExecute(Integer userId, Long jobDefineId) { public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { String projectRoot = System.getProperty("user.dir"); - String filePath = projectRoot + "\\profile\\" + jobDefineId + ".conf"; + String filePath = + projectRoot + File.separator + "profile" + File.separator + jobDefineId + ".conf"; try { File file = new File(filePath); if (!file.exists()) { @@ -120,6 +122,7 @@ public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInsta }); } catch (ExecutionException | InterruptedException e) { + ExceptionUtils.getMessage(e); throw new RuntimeException(e); } return jobInstanceId; @@ -189,8 +192,14 @@ public Result jobStore(Integer userId, Long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); String projectRoot = System.getProperty("user.dir"); - String filePath = projectRoot + "\\profile\\" + jobInstance.getJobDefineId() + ".conf"; - + String filePath = + projectRoot + + File.separator + + "profile" + + File.separator + + jobInstance.getJobDefineId() + + ".conf"; + log.info("jobStore filePath:{}", filePath); SeaTunnelEngineProxy.getInstance() .restoreJob(filePath, jobInstanceId, Long.valueOf(jobInstance.getJobEngineId())); return Result.success(); From 220391ad6595c879eeb64a7860b9839825761d6d Mon Sep 17 00:00:00 2001 From: ChunFuWu <319355703@qq.com> Date: Fri, 3 Nov 2023 21:38:48 +0800 Subject: [PATCH 3/4] [Improve][UI] Translate Route Names (#144) --- seatunnel-ui/src/router/sync-task.ts | 8 ++++---- seatunnel-ui/src/router/virtual-tables.ts | 4 ++-- seatunnel-ui/src/service/sync-task-definition/index.ts | 3 +-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/seatunnel-ui/src/router/sync-task.ts b/seatunnel-ui/src/router/sync-task.ts index 910e18773..4cb00f269 100644 --- a/seatunnel-ui/src/router/sync-task.ts +++ b/seatunnel-ui/src/router/sync-task.ts @@ -35,7 +35,7 @@ export default { name: 'synchronization-definition', component: components['projects-task-synchronization-definition'], meta: { - title: '同步任务定义', + title: 'synchronization-definition', activeMenu: 'projects', showSide: true // auth: 'project:seatunnel-task:view' @@ -46,7 +46,7 @@ export default { name: 'synchronization-definition-dag', component: components['projects-task-synchronization-definition-dag'], meta: { - title: '同步任务定义画布', + title: 'synchronization-definition-dag', activeMenu: 'projects', activeSide: '/task/synchronization-definition', showSide: true, @@ -58,7 +58,7 @@ export default { name: 'synchronization-instance', component: components['projects-task-synchronization-instance'], meta: { - title: '同步任务实例', + title: 'synchronization-instance', activeMenu: 'projects', showSide: true // auth: 'project:seatunnel-task-instance:view' @@ -69,7 +69,7 @@ export default { name: 'synchronization-instance-detail', component: components['projects-task-synchronization-instance-detail'], meta: { - title: '同步任务实例详情', + title: 'synchronization-instance-detail', activeMenu: 'projects', activeSide: '/task/synchronization-instance', showSide: true, diff --git a/seatunnel-ui/src/router/virtual-tables.ts b/seatunnel-ui/src/router/virtual-tables.ts index a097f2c0b..82378424c 100644 --- a/seatunnel-ui/src/router/virtual-tables.ts +++ b/seatunnel-ui/src/router/virtual-tables.ts @@ -44,7 +44,7 @@ export default { name: 'virtual-tables-create', component: components['virtual-tables-detail'], meta: { - title: '虚拟表创建', + title: 'virtual-tables-create', activeMenu: 'virtual-tables', } }, @@ -53,7 +53,7 @@ export default { name: 'virtual-tables-editor', component: components['virtual-tables-detail'], meta: { - title: '虚拟表编辑', + title: 'virtual-tables-editor', activeMenu: 'virtual-tables', } } diff --git a/seatunnel-ui/src/service/sync-task-definition/index.ts b/seatunnel-ui/src/service/sync-task-definition/index.ts index ca970265f..3abcc37aa 100644 --- a/seatunnel-ui/src/service/sync-task-definition/index.ts +++ b/seatunnel-ui/src/service/sync-task-definition/index.ts @@ -145,7 +145,6 @@ export function queryTaskDetail(jobCode: string, taskCode: string): any { }) } -// source类型,获取源名称 export function listSourceName( jobId: string, sceneMode: string, @@ -161,7 +160,7 @@ export function listSourceName( } }) } -// sink类型,获取源名称 + export function findSink( jobId: string, status: 'DOWNLOADED' | 'NOT_DOWNLOAD' | 'ALL' = 'DOWNLOADED' From 4c6b067dfefa42a00e69bbb09f94736f371ad81d Mon Sep 17 00:00:00 2001 From: XiaoJiang521 <131635688+XiaoJiang521@users.noreply.github.com> Date: Tue, 7 Nov 2023 14:45:29 +0800 Subject: [PATCH 4/4] [feature][datasource] Add Mongodb datasource --- .../classloader/DatasourceLoadConfig.java | 6 +- .../datasource-mongodb/pom.xml | 64 ++++++++++ .../mongodb/MongoDataSoueceChannel.java | 106 ++++++++++++++++ .../mongodb/MongoDataSourceFactory.java | 57 +++++++++ .../plugin/mongodb/MongoOptionRule.java | 51 ++++++++ .../mongodb/MongoRequestParamsUtils.java | 33 +++++ .../plugin/api/DataSourceChannel.java | 14 ++- .../seatunnel-datasource-plugins/pom.xml | 1 + .../src/main/bin/download_datasource.sh | 1 + .../impl/MongoDBDataSourceConfigSwitcher.java | 115 ++++++++++++++++++ .../connector-datasource-mapper.yaml | 19 +++ seatunnel-web-dist/pom.xml | 13 ++ 12 files changed, 477 insertions(+), 3 deletions(-) create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java index ef3dc0d77..a8b480786 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java @@ -92,6 +92,8 @@ public class DatasourceLoadConfig { classLoaderFactoryName.put( "JDBC-STARROCKS", "org.apache.seatunnel.datasource.plugin.starrocks.jdbc.StarRocksJdbcDataSourceFactory"); + classLoaderFactoryName.put( + "MONGODB", "com.apache.seatunnel.datasource.plugin.mongodb.MongoDataSourceFactory"); classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-"); classLoaderJarName.put("JDBC-CLICKHOUSE", "datasource-jdbc-clickhouse-"); @@ -111,6 +113,7 @@ public class DatasourceLoadConfig { classLoaderJarName.put("STARROCKS", "datasource-starrocks-"); classLoaderJarName.put("S3-REDSHIFT", "datasource-s3redshift-"); classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-"); + classLoaderJarName.put("MONGODB", "datasource-mongodb-"); } public static final Set pluginSet = @@ -127,7 +130,8 @@ public class DatasourceLoadConfig { "MySQL-CDC", "S3", "SqlServer-CDC", - "StarRocks"); + "StarRocks", + "MongoDB"); public static Map datasourceClassLoaders = new HashMap<>(); diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml new file mode 100644 index 000000000..8bbf4dcf7 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-datasource-plugins + ${revision} + + + datasource-mongodb + + + + org.apache.seatunnel + datasource-plugins-api + ${project.version} + provided + + + + org.mongodb + mongodb-driver-sync + 4.7.1 + + + + org.apache.commons + commons-lang3 + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + ${e2e.dependency.skip} + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java new file mode 100644 index 000000000..55ba487d1 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.seatunnel.datasource.plugin.mongodb; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.ImmutableList; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoIterable; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class MongoDataSoueceChannel implements DataSourceChannel { + + private static final String DATABASE = "default"; + + @Override + public OptionRule getDataSourceOptions(@NonNull String pluginName) { + return MongoOptionRule.optionRule(); + } + + @Override + public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) { + return MongoOptionRule.metadataRule(); + } + + public List getTables( + @NonNull String pluginName, + Map requestParams, + String database, + Map options) { + checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); + + return Collections.emptyList(); + } + + @Override + public List getDatabases( + @NonNull String pluginName, @NonNull Map requestParams) { + return ImmutableList.of(DATABASE); + } + + @Override + public List getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull String table) { + checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); + return Collections.emptyList(); + } + + @Override + public boolean checkDataSourceConnectivity( + @NonNull String pluginName, @NonNull Map requestParams) { + + try (MongoClient mongoClient = createMongoClient(requestParams)) { + // Verify if the connection to mongodb was successful + MongoIterable databaseNames = mongoClient.listDatabaseNames(); + if (databaseNames.iterator().hasNext()) { + log.info("mongoDB connection successful"); + return true; + } else { + return false; + } + } catch (Exception e) { + throw new DataSourcePluginException( + "check MongoDB connectivity failed, " + e.getMessage(), e); + } + } + + // Resolve the URI in requestParams of Map type + private MongoClient createMongoClient(Map requestParams) { + + return MongoClients.create( + MongoRequestParamsUtils.parseStringFromRequestParams(requestParams)); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java new file mode 100644 index 000000000..b08f5b305 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.seatunnel.datasource.plugin.mongodb; + +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; +import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Sets; + +import java.util.Set; + +@AutoService(DataSourceFactory.class) +public class MongoDataSourceFactory implements DataSourceFactory { + public static final String MONGO_PLUGIN_NAME = "MongoDB"; + public static final String MONGO_PLUGIN_ICON = "MongoDB"; + public static final String MONGO_PLUGIN_VERSION = "1.0.0"; + + @Override + public String factoryIdentifier() { + return MONGO_PLUGIN_NAME; + } + + @Override + public Set supportedDataSources() { + return Sets.newHashSet( + DataSourcePluginInfo.builder() + .name(MONGO_PLUGIN_NAME) + .icon(MONGO_PLUGIN_ICON) + .version(MONGO_PLUGIN_VERSION) + .supportVirtualTables(true) + .type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode()) + .build()); + } + + @Override + public DataSourceChannel createChannel() { + return new MongoDataSoueceChannel(); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java new file mode 100644 index 000000000..cde0f89fd --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.seatunnel.datasource.plugin.mongodb; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.util.OptionRule; + +public class MongoOptionRule { + + public static final Option URI = + Options.key("uri") + .stringType() + .noDefaultValue() + .withDescription("The MongoDB connection uri."); + + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("The name of MongoDB database to read or write."); + + public static final Option COLLECTION = + Options.key("collection") + .stringType() + .noDefaultValue() + .withDescription("The name of MongoDB collection to read or write."); + + public static OptionRule optionRule() { + return OptionRule.builder().required(URI).build(); + } + + public static OptionRule metadataRule() { + return OptionRule.builder().required(DATABASE, COLLECTION).build(); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java new file mode 100644 index 000000000..eb342acff --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.seatunnel.datasource.plugin.mongodb; + +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; + +public class MongoRequestParamsUtils { + + public static String parseStringFromRequestParams(Map requestParams) { + checkArgument( + requestParams.containsKey(MongoOptionRule.URI.key()), + String.format("Missing %s in requestParams", MongoOptionRule.URI.key())); + + return requestParams.get(MongoOptionRule.URI.key()); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java index 72a2ea208..8fb645d4a 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java @@ -28,6 +28,8 @@ import java.sql.Connection; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; public interface DataSourceChannel { @@ -71,11 +73,19 @@ List getTableFields( @NonNull String database, @NonNull String table); - Map> getTableFields( + default Map> getTableFields( @NonNull String pluginName, @NonNull Map requestParams, @NonNull String database, - @NonNull List tables); + @NonNull List tables) { + return tables.parallelStream() + .collect( + Collectors.toMap( + Function.identity(), + table -> + getTableFields( + pluginName, requestParams, database, table))); + } /** * just check metadata field is right and used by virtual table diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml index 5f2296bb3..44c6ac6cb 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml @@ -44,6 +44,7 @@ datasource-s3 datasource-sqlserver-cdc datasource-jdbc-tidb + datasource-mongodb diff --git a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh index 1c09af2f7..3ea36ab6d 100644 --- a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh +++ b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh @@ -43,6 +43,7 @@ datasource_list=( "datasource-s3" "datasource-sqlserver-cdc" "datasource-starrocks" + "datasource-mongodb" ) # the datasource default version is 1.0.0, you can also choose a custom version. eg: 1.1.2: sh install-datasource.sh 2.1.2 diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java new file mode 100644 index 000000000..2153c52e7 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.thirdparty.datasource.impl; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.RequiredOption; +import org.apache.seatunnel.app.domain.request.connector.BusinessMode; +import org.apache.seatunnel.app.domain.request.job.DataSourceOption; +import org.apache.seatunnel.app.domain.request.job.SelectTableFields; +import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; +import org.apache.seatunnel.app.dynamicforms.FormStructure; +import org.apache.seatunnel.app.thirdparty.datasource.AbstractDataSourceConfigSwitcher; +import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher; +import org.apache.seatunnel.common.constants.PluginType; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@AutoService(DataSourceConfigSwitcher.class) +@Slf4j +public class MongoDBDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher { + private static final String DATABASE = "database"; + private static final String COLLECTION = "collection"; + private static final String SCHEMA = "schema"; + + @Override + public String getDataSourceName() { + return "MONGODB"; + } + + @Override + public FormStructure filterOptionRule( + String connectorName, + OptionRule dataSourceOptionRule, + OptionRule virtualTableOptionRule, + BusinessMode businessMode, + PluginType pluginType, + OptionRule connectorOptionRule, + List addRequiredOptions, + List> addOptionalOptions, + List excludedKeys) { + excludedKeys.add(SCHEMA); + return super.filterOptionRule( + connectorName, + dataSourceOptionRule, + virtualTableOptionRule, + businessMode, + pluginType, + connectorOptionRule, + addRequiredOptions, + addOptionalOptions, + excludedKeys); + } + + @Override + public Config mergeDatasourceConfig( + Config dataSourceInstanceConfig, + VirtualTableDetailRes virtualTableDetail, + DataSourceOption dataSourceOption, + SelectTableFields selectTableFields, + BusinessMode businessMode, + PluginType pluginType, + Config connectorConfig) { + // Use field to generate the schema + connectorConfig = + connectorConfig.withValue( + DATABASE, + ConfigValueFactory.fromAnyRef( + virtualTableDetail.getDatasourceProperties().get(DATABASE))); + connectorConfig = + connectorConfig.withValue( + COLLECTION, + ConfigValueFactory.fromAnyRef( + virtualTableDetail.getDatasourceProperties().get(COLLECTION))); + if (pluginType == PluginType.SOURCE) { + connectorConfig = + connectorConfig.withValue( + SCHEMA, + KafkaKingbaseDataSourceConfigSwitcher.SchemaGenerator + .generateSchemaBySelectTableFields( + virtualTableDetail, selectTableFields) + .root()); + } + + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml index 917c677b7..d322d2e7b 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml +++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml @@ -54,6 +54,10 @@ connector-datasource-mapper: dataSources: - Postgres-CDC + MongoDB: + dataSources: + - MongoDB + sourceDatasourceFeatures: JDBC-Mysql: businessMode: @@ -123,6 +127,13 @@ connector-datasource-mapper: - MULTIPLE_TABLE - SPLIT_TABLE + MongoDB: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE + jobMode: + - BATCH sinkDatasourceFeatures: @@ -196,3 +207,11 @@ connector-datasource-mapper: - DATA_INTEGRATION sceneMode: - SINGLE_TABLE + + MongoDB: + businessMode: + - DATA_INTEGRATION + - DATA_REPLICA + sceneMode: + - SINGLE_TABLE + - MULTIPLE_TABLE \ No newline at end of file diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml index 7e83b1acb..f19e379a7 100644 --- a/seatunnel-web-dist/pom.xml +++ b/seatunnel-web-dist/pom.xml @@ -350,6 +350,19 @@ + + org.apache.seatunnel + datasource-mongodb + ${project.version} + provided + + + * + * + + + + com.oracle.database.jdbc