From 80e6e15ab445fcb4bc9f9e5bf13a940ead30640b Mon Sep 17 00:00:00 2001 From: Xishu Fan Date: Sun, 5 Jan 2025 14:28:42 +0800 Subject: [PATCH] web datasource-postgresql-cdc] support web add pg cdc dadasourcehttps://github.com/apache/seatunnel/issues/7437 --- .../PostgresCDCDataSourceChannel.java | 65 ++++++++++--------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-postgresql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/postgresql/PostgresCDCDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-postgresql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/postgresql/PostgresCDCDataSourceChannel.java index bb254d80d..7328c05c4 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-postgresql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/postgresql/PostgresCDCDataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-postgresql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/postgresql/PostgresCDCDataSourceChannel.java @@ -1,21 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ package org.apache.seatunnel.datasource.plugin.cdc.postgresql; import org.apache.seatunnel.api.configuration.util.OptionRule; @@ -24,6 +6,7 @@ import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -106,7 +89,7 @@ public Map> getTableFields( protected boolean checkJdbcConnectivity(Map requestParams) { try (Connection connection = init(requestParams); - Statement statement = connection.createStatement()) { + Statement statement = connection.createStatement()) { try (ResultSet resultSet = statement.executeQuery("SELECT 1")) { return resultSet.next(); @@ -136,10 +119,10 @@ protected Connection init(Map requestParams) throws SQLException protected List getDataBaseNames(Map requestParams) throws SQLException { List dbNames = new ArrayList<>(); try (Connection connection = init(requestParams); - PreparedStatement statement = - connection.prepareStatement( - "SELECT datname FROM pg_database WHERE datistemplate = false;"); - ResultSet re = statement.executeQuery()) { + PreparedStatement statement = + connection.prepareStatement( + "SELECT datname FROM pg_database WHERE datistemplate = false;"); + ResultSet re = statement.executeQuery()) { while (re.next()) { String dbName = re.getString("datname"); if (StringUtils.isNotBlank(dbName)) { @@ -192,7 +175,7 @@ protected List getTableNames( requestParams.get(PostgresCDCOptionRule.BASE_URL.key()), dbName)); try (Connection connection = init(requestParams)) { try (Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(queryWhere.toString())) { + ResultSet resultSet = statement.executeQuery(queryWhere.toString())) { while (resultSet.next()) { String schemaName = resultSet.getString("table_schema"); String tableName = resultSet.getString("table_name"); @@ -232,18 +215,21 @@ protected List getTableFields( List tableFields = new ArrayList<>(); try (Connection connection = init(requestParams)) { DatabaseMetaData metaData = connection.getMetaData(); - String primaryKey = getPrimaryKey(metaData, dbName, tableName); String[] split = tableName.split("\\."); if (split.length != 2) { throw new DataSourcePluginException( "Postgresql tableName should composed by schemaName.tableName"); } + String schemaName = split[0]; + String table = split[1]; + List primaryKey = getPrimaryKey(metaData, dbName, table, schemaName); + try (ResultSet resultSet = metaData.getColumns(dbName, split[0], split[1], null)) { while (resultSet.next()) { TableField tableField = new TableField(); String columnName = resultSet.getString("COLUMN_NAME"); tableField.setPrimaryKey(false); - if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) { + if (CollectionUtils.isNotEmpty(primaryKey) && primaryKey.contains(columnName)) { tableField.setPrimaryKey(true); } tableField.setName(columnName); @@ -260,13 +246,30 @@ protected List getTableFields( return tableFields; } - private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName) + private List getPrimaryKey( + DatabaseMetaData metaData, String dbName, String tableName, String schemaName) throws SQLException { - ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName); - while (primaryKeysInfo.next()) { - return primaryKeysInfo.getString("COLUMN_NAME"); + List keyList = new ArrayList<>(); + + // schemaName 为 null 时,获取当前连接的 Schema + if (schemaName == null || schemaName.isEmpty()) { + try (ResultSet schemas = metaData.getSchemas()) { + while (schemas.next()) { + if (schemas.getString("TABLE_SCHEM") + .equalsIgnoreCase(metaData.getConnection().getSchema())) { + schemaName = schemas.getString("TABLE_SCHEM"); + break; + } + } + } + } + + try (ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, schemaName, tableName)) { + while (primaryKeysInfo.next()) { + keyList.add(primaryKeysInfo.getString("COLUMN_NAME")); + } } - return null; + return keyList; } private boolean convertToBoolean(Object value) {