diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java index 90673587c..fee9fb070 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java @@ -38,7 +38,16 @@ public class Db2DataSourceConfig { .build(); public static final Set DB2_SYSTEM_DATABASES = - Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys"); + Sets.newHashSet( + "SYSTOOLS", + "SYSCAT", + "SYSIBM", + "SYSIBMADM", + "SYSSTAT", + "SYSPROC", + "SYSFUN", + "SYSPUBLIC", + "SYSIBMTS"); public static final OptionRule OPTION_RULE = OptionRule.builder() diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java index 844a47563..724d428ee 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java @@ -25,14 +25,15 @@ import org.apache.commons.lang3.StringUtils; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -40,8 +41,8 @@ import static com.google.common.base.Preconditions.checkNotNull; +@Slf4j public class Db2JdbcDataSourceChannel implements DataSourceChannel { - @Override public OptionRule getDataSourceOptions(@NonNull String pluginName) { return Db2DataSourceConfig.OPTION_RULE; @@ -65,13 +66,13 @@ public List getTables( if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) { filterName = "%" + filterName + "%"; } else if (StringUtils.equals(filterName, "")) { - filterName = null; + filterName = "%"; } try (Connection connection = getConnection(requestParams); ResultSet resultSet = connection .getMetaData() - .getTables(null, null, "%", new String[] {"TABLE"})) { + .getTables(null, database, filterName, new String[] {"TABLE"})) { while (resultSet.next()) { String tableName = resultSet.getString("TABLE_NAME"); if (StringUtils.isNotBlank(tableName)) { @@ -90,11 +91,30 @@ public List getTables( @Override public List getDatabases( @NonNull String pluginName, @NonNull Map requestParams) { - // Hardcoded list of example database names - List dbNames = Arrays.asList("default"); + List dbNames = new ArrayList<>(); + try (Connection connection = getConnection(requestParams); + Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery("SELECT SCHEMANAME FROM SYSCAT.SCHEMATA"); + while (resultSet.next()) { + String dbName = resultSet.getString("SCHEMANAME"); + if (StringUtils.isBlank(dbName)) { + continue; + } + dbName = dbName.trim(); + if (isNotSystemDatabase(dbName)) { + dbNames.add(dbName); + } + } + } catch (SQLException | ClassNotFoundException e) { + throw new DataSourcePluginException("Failed to get database names", e); + } return dbNames; } + private boolean isNotSystemDatabase(String dbName) { + return !Db2DataSourceConfig.DB2_SYSTEM_DATABASES.contains(dbName.toUpperCase()); + } + @Override public boolean checkDataSourceConnectivity( @NonNull String pluginName, @NonNull Map requestParams) { @@ -119,7 +139,7 @@ public List getTableFields( String primaryKey = getPrimaryKey(metaData, database, table); // Retrieve column information - try (ResultSet resultSet = metaData.getColumns(null, null, table, null)) { + try (ResultSet resultSet = metaData.getColumns(null, database, table, null)) { while (resultSet.next()) { TableField tableField = new TableField(); @@ -140,14 +160,9 @@ public List getTableFields( tableFields.add(tableField); } } - } catch (SQLException e) { - // Log the exception and rethrow as DataSourcePluginException - System.out.println("Error while retrieving table fields: " + e); + } catch (SQLException | ClassNotFoundException e) { + log.error("Error while retrieving table fields", e); throw new DataSourcePluginException("Failed to get table fields", e); - } catch (ClassNotFoundException e) { - // Log the exception and rethrow as DataSourcePluginException - System.out.println("JDBC driver class not found" + e); - throw new DataSourcePluginException("JDBC driver class not found", e); } return tableFields; } @@ -170,7 +185,7 @@ public Map> getTableFields( private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName) throws SQLException { ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName, tableName); - while (primaryKeysInfo.next()) { + if (primaryKeysInfo.next()) { return primaryKeysInfo.getString("COLUMN_NAME"); } return null; @@ -178,8 +193,15 @@ private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String ta private Connection getConnection(Map requestParams) throws SQLException, ClassNotFoundException { - // Ensure the DB2 JDBC driver is loaded - Class.forName("com.ibm.db2.jcc.DB2Driver"); + String driverClass = + requestParams.getOrDefault( + Db2OptionRule.DRIVER.key(), + Db2OptionRule.DriverType.DB2.getDriverClassName()); + try { + Class.forName(driverClass); + } catch (ClassNotFoundException e) { + throw new DataSourcePluginException("DB2 jdbc driver " + driverClass + " not found", e); + } checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url cannot be null"); String url = requestParams.get(Db2OptionRule.URL.key()); if (requestParams.containsKey(Db2OptionRule.USER.key())) { diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java index e5e2b9b1b..4947dffd8 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java @@ -26,9 +26,7 @@ public class Db2OptionRule { Options.key("url") .stringType() .noDefaultValue() - .withDescription( - "jdbc url, eg:" - + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"); + .withDescription("jdbc url, eg:" + " jdbc:db2://localhost:50000/databaseName"); public static final Option USER = Options.key("user").stringType().noDefaultValue().withDescription("jdbc user"); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java index f7fae7313..8d56224ff 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java @@ -69,7 +69,11 @@ public enum DataType { T_INT_ARRAY("array", ArrayType.INT_ARRAY_TYPE), T_LONG_ARRAY("array", ArrayType.LONG_ARRAY_TYPE), T_FLOAT_ARRAY("array", ArrayType.FLOAT_ARRAY_TYPE), - T_DOUBLE_ARRAY("array", ArrayType.DOUBLE_ARRAY_TYPE); + T_DOUBLE_ARRAY("array", ArrayType.DOUBLE_ARRAY_TYPE), + T_VARCHAR("varchar", BasicType.STRING_TYPE), + T_CHAR("char", BasicType.STRING_TYPE), + T_INTEGER("integer", BasicType.INT_TYPE), + T_DECIMAL_DEFAULT("decimal", BasicType.DOUBLE_TYPE); @Getter private final String name; @Getter private final SeaTunnelDataType RawType; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java index 327f98290..022148d61 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java @@ -199,7 +199,7 @@ protected String generateSql( sb.append(", "); } } - sb.append(" FROM ").append(quoteIdentifier(table)); + sb.append(" FROM ").append(quoteIdentifier(database) + "." + quoteIdentifier(table)); return sb.toString(); } @@ -211,7 +211,9 @@ protected String quoteIdentifier(String identifier) { protected String generateSinkSql( List tableFields, String database, String schema, String table) { StringBuilder sb = new StringBuilder(); - sb.append("INSERT INTO ").append(quoteIdentifier(table)).append(" ("); + sb.append("INSERT INTO ") + .append(quoteIdentifier(database) + "." + quoteIdentifier(table)) + .append(" ("); // Append column names for (int i = 0; i < tableFields.size(); i++) {