Skip to content

Commit

Permalink
[Bug][Seatunnel-web][DB2] DB2 Datasource fails due to improper use of…
Browse files Browse the repository at this point in the history
… database and schema names. (#220)
  • Loading branch information
arshadmohammad authored Sep 29, 2024
1 parent 5302b8c commit 42e1761
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public class Db2DataSourceConfig {
.build();

public static final Set<String> DB2_SYSTEM_DATABASES =
Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");
Sets.newHashSet(
"SYSTOOLS",
"SYSCAT",
"SYSIBM",
"SYSIBMADM",
"SYSSTAT",
"SYSPROC",
"SYSFUN",
"SYSPUBLIC",
"SYSIBMTS");

public static final OptionRule OPTION_RULE =
OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@
import org.apache.commons.lang3.StringUtils;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

@Slf4j
public class Db2JdbcDataSourceChannel implements DataSourceChannel {

@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return Db2DataSourceConfig.OPTION_RULE;
Expand All @@ -65,13 +66,13 @@ public List<String> getTables(
if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
filterName = "%" + filterName + "%";
} else if (StringUtils.equals(filterName, "")) {
filterName = null;
filterName = "%";
}
try (Connection connection = getConnection(requestParams);
ResultSet resultSet =
connection
.getMetaData()
.getTables(null, null, "%", new String[] {"TABLE"})) {
.getTables(null, database, filterName, new String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
Expand All @@ -90,11 +91,30 @@ public List<String> getTables(
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
// Hardcoded list of example database names
List<String> dbNames = Arrays.asList("default");
List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("SELECT SCHEMANAME FROM SYSCAT.SCHEMATA");
while (resultSet.next()) {
String dbName = resultSet.getString("SCHEMANAME");
if (StringUtils.isBlank(dbName)) {
continue;
}
dbName = dbName.trim();
if (isNotSystemDatabase(dbName)) {
dbNames.add(dbName);
}
}
} catch (SQLException | ClassNotFoundException e) {
throw new DataSourcePluginException("Failed to get database names", e);
}
return dbNames;
}

private boolean isNotSystemDatabase(String dbName) {
return !Db2DataSourceConfig.DB2_SYSTEM_DATABASES.contains(dbName.toUpperCase());
}

@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
Expand All @@ -119,7 +139,7 @@ public List<TableField> getTableFields(
String primaryKey = getPrimaryKey(metaData, database, table);

// Retrieve column information
try (ResultSet resultSet = metaData.getColumns(null, null, table, null)) {
try (ResultSet resultSet = metaData.getColumns(null, database, table, null)) {

while (resultSet.next()) {
TableField tableField = new TableField();
Expand All @@ -140,14 +160,9 @@ public List<TableField> getTableFields(
tableFields.add(tableField);
}
}
} catch (SQLException e) {
// Log the exception and rethrow as DataSourcePluginException
System.out.println("Error while retrieving table fields: " + e);
} catch (SQLException | ClassNotFoundException e) {
log.error("Error while retrieving table fields", e);
throw new DataSourcePluginException("Failed to get table fields", e);
} catch (ClassNotFoundException e) {
// Log the exception and rethrow as DataSourcePluginException
System.out.println("JDBC driver class not found" + e);
throw new DataSourcePluginException("JDBC driver class not found", e);
}
return tableFields;
}
Expand All @@ -170,16 +185,23 @@ public Map<String, List<TableField>> getTableFields(
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName, tableName);
while (primaryKeysInfo.next()) {
if (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
}
return null;
}

private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
// Ensure the DB2 JDBC driver is loaded
Class.forName("com.ibm.db2.jcc.DB2Driver");
String driverClass =
requestParams.getOrDefault(
Db2OptionRule.DRIVER.key(),
Db2OptionRule.DriverType.DB2.getDriverClassName());
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
throw new DataSourcePluginException("DB2 jdbc driver " + driverClass + " not found", e);
}
checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url cannot be null");
String url = requestParams.get(Db2OptionRule.URL.key());
if (requestParams.containsKey(Db2OptionRule.USER.key())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public class Db2OptionRule {
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
.withDescription("jdbc url, eg:" + " jdbc:db2://localhost:50000/databaseName");

public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public enum DataType {
T_INT_ARRAY("array<int>", ArrayType.INT_ARRAY_TYPE),
T_LONG_ARRAY("array<bigint>", ArrayType.LONG_ARRAY_TYPE),
T_FLOAT_ARRAY("array<float>", ArrayType.FLOAT_ARRAY_TYPE),
T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE);
T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE),
T_VARCHAR("varchar", BasicType.STRING_TYPE),
T_CHAR("char", BasicType.STRING_TYPE),
T_INTEGER("integer", BasicType.INT_TYPE),
T_DECIMAL_DEFAULT("decimal", BasicType.DOUBLE_TYPE);

@Getter private final String name;
@Getter private final SeaTunnelDataType<?> RawType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected String generateSql(
sb.append(", ");
}
}
sb.append(" FROM ").append(quoteIdentifier(table));
sb.append(" FROM ").append(quoteIdentifier(database) + "." + quoteIdentifier(table));

return sb.toString();
}
Expand All @@ -211,7 +211,9 @@ protected String quoteIdentifier(String identifier) {
protected String generateSinkSql(
List<String> tableFields, String database, String schema, String table) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ").append(quoteIdentifier(table)).append(" (");
sb.append("INSERT INTO ")
.append(quoteIdentifier(database) + "." + quoteIdentifier(table))
.append(" (");

// Append column names
for (int i = 0; i < tableFields.size(); i++) {
Expand Down

0 comments on commit 42e1761

Please sign in to comment.