Skip to content

Commit

Permalink
web datasource-postgresql-cdc] support web add pg cdc dadasourcehttps…
Browse files Browse the repository at this point in the history
  • Loading branch information
Xishu Fan committed Jan 5, 2025
1 parent 64c8bb6 commit 80e6e15
Showing 1 changed file with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.seatunnel.datasource.plugin.cdc.postgresql;

import org.apache.seatunnel.api.configuration.util.OptionRule;
Expand All @@ -24,6 +6,7 @@
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -106,7 +89,7 @@ public Map<String, List<TableField>> getTableFields(

protected boolean checkJdbcConnectivity(Map<String, String> requestParams) {
try (Connection connection = init(requestParams);
Statement statement = connection.createStatement()) {
Statement statement = connection.createStatement()) {

try (ResultSet resultSet = statement.executeQuery("SELECT 1")) {
return resultSet.next();
Expand Down Expand Up @@ -136,10 +119,10 @@ protected Connection init(Map<String, String> requestParams) throws SQLException
protected List<String> getDataBaseNames(Map<String, String> requestParams) throws SQLException {
List<String> dbNames = new ArrayList<>();
try (Connection connection = init(requestParams);
PreparedStatement statement =
connection.prepareStatement(
"SELECT datname FROM pg_database WHERE datistemplate = false;");
ResultSet re = statement.executeQuery()) {
PreparedStatement statement =
connection.prepareStatement(
"SELECT datname FROM pg_database WHERE datistemplate = false;");
ResultSet re = statement.executeQuery()) {
while (re.next()) {
String dbName = re.getString("datname");
if (StringUtils.isNotBlank(dbName)) {
Expand Down Expand Up @@ -192,7 +175,7 @@ protected List<String> getTableNames(
requestParams.get(PostgresCDCOptionRule.BASE_URL.key()), dbName));
try (Connection connection = init(requestParams)) {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(queryWhere.toString())) {
ResultSet resultSet = statement.executeQuery(queryWhere.toString())) {
while (resultSet.next()) {
String schemaName = resultSet.getString("table_schema");
String tableName = resultSet.getString("table_name");
Expand Down Expand Up @@ -232,18 +215,21 @@ protected List<TableField> getTableFields(
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = init(requestParams)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName);
String[] split = tableName.split("\\.");
if (split.length != 2) {
throw new DataSourcePluginException(
"Postgresql tableName should composed by schemaName.tableName");
}
String schemaName = split[0];
String table = split[1];
List<String> primaryKey = getPrimaryKey(metaData, dbName, table, schemaName);

try (ResultSet resultSet = metaData.getColumns(dbName, split[0], split[1], null)) {
while (resultSet.next()) {
TableField tableField = new TableField();
String columnName = resultSet.getString("COLUMN_NAME");
tableField.setPrimaryKey(false);
if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
if (CollectionUtils.isNotEmpty(primaryKey) && primaryKey.contains(columnName)) {
tableField.setPrimaryKey(true);
}
tableField.setName(columnName);
Expand All @@ -260,13 +246,30 @@ protected List<TableField> getTableFields(
return tableFields;
}

private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
private List<String> getPrimaryKey(
DatabaseMetaData metaData, String dbName, String tableName, String schemaName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
List<String> keyList = new ArrayList<>();

// schemaName 为 null 时,获取当前连接的 Schema
if (schemaName == null || schemaName.isEmpty()) {
try (ResultSet schemas = metaData.getSchemas()) {
while (schemas.next()) {
if (schemas.getString("TABLE_SCHEM")
.equalsIgnoreCase(metaData.getConnection().getSchema())) {
schemaName = schemas.getString("TABLE_SCHEM");
break;
}
}
}
}

try (ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, schemaName, tableName)) {
while (primaryKeysInfo.next()) {
keyList.add(primaryKeysInfo.getString("COLUMN_NAME"));
}
}
return null;
return keyList;
}

private boolean convertToBoolean(Object value) {
Expand Down

0 comments on commit 80e6e15

Please sign in to comment.