Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7969 Remove configuration toggle
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Jun 26, 2024
1 parent 64d12e6 commit f338f14
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,6 @@ public class JdbcSinkConnectorConfig {
.withDescription(
"A reduction buffer consolidates the execution of SQL statements by primary key to reduce the SQL load on the target database. When set to false (the default), each incoming event is applied as a logical SQL change. When set to true, incoming events that refer to the same row will be reduced to a single logical change based on the most recent row state.");

public static final Field COLUMN_TYPE_RESOLUTION_MODE_FIELD = Field.create(COLUMN_TYPE_RESOLUTION_MODE)
.withDisplayName("Specifies how a column's type is resolved")
.withEnum(ColumnTypeResolutionMode.class, ColumnTypeResolutionMode.LEGACY)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Controls how the column type is resolved when no length is specified in the event metadata. " +
"legacy: Uses the legacy behavior where non-length character and binary types prefer CLOB, TEXT, LONGTEXT, BLOB types, " +
"improved: Uses an improved behavior to prefer character-based texts like VARCHAR but with the dialect's maximum length instead of CLOB, TEXT, or LONGTEXT types.");

protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
CONNECTION_URL_FIELD,
Expand All @@ -355,8 +345,7 @@ public class JdbcSinkConnectorConfig {
SQLSERVER_IDENTITY_INSERT_FIELD,
BATCH_SIZE_FIELD,
FIELD_INCLUDE_LIST_FIELD,
FIELD_EXCLUDE_LIST_FIELD,
COLUMN_TYPE_RESOLUTION_MODE_FIELD)
FIELD_EXCLUDE_LIST_FIELD)
.create();

/**
Expand Down Expand Up @@ -511,39 +500,6 @@ public String getValue() {
}
}

public enum ColumnTypeResolutionMode implements EnumeratedValue {
/**
* The legacy column type resolution that is based on Debezium 2.x behavior.
*/
LEGACY("legacy"),

/**
* An improved mode that favors using maximum length character-based data types over the
* use of large object types such as CLOB, TEXT, LONGTEXT, etc.
*/
IMPROVED("improved");

private String mode;

ColumnTypeResolutionMode(String mode) {
this.mode = mode;
}

public static ColumnTypeResolutionMode parse(String value) {
for (ColumnTypeResolutionMode option : ColumnTypeResolutionMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return ColumnTypeResolutionMode.LEGACY;
}

@Override
public String getValue() {
return mode;
}
}

private final Configuration config;

private final InsertMode insertMode;
Expand All @@ -562,7 +518,6 @@ public String getValue() {
private final boolean sqlServerIdentityInsert;
private final long batchSize;
private final boolean useReductionBuffer;
private final ColumnTypeResolutionMode columnTypeResolutionMode;
private FieldNameFilter fieldsFilter;

public JdbcSinkConnectorConfig(Map<String, String> props) {
Expand All @@ -583,7 +538,6 @@ public JdbcSinkConnectorConfig(Map<String, String> props) {
this.sqlServerIdentityInsert = config.getBoolean(SQLSERVER_IDENTITY_INSERT_FIELD);
this.batchSize = config.getLong(BATCH_SIZE_FIELD);
this.useReductionBuffer = config.getBoolean(USE_REDUCTION_BUFFER_FIELD);
this.columnTypeResolutionMode = ColumnTypeResolutionMode.parse(config.getString(COLUMN_TYPE_RESOLUTION_MODE_FIELD));

String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
String fieldIncludeList = config.getString(FIELD_INCLUDE_LIST);
Expand Down Expand Up @@ -685,10 +639,6 @@ public String getPostgresPostgisSchema() {
return postgresPostgisSchema;
}

public ColumnTypeResolutionMode getColumnTypeResolutionMode() {
return columnTypeResolutionMode;
}

/** makes {@link org.hibernate.cfg.Configuration} from connector config
*
* @return {@link org.hibernate.cfg.Configuration}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package io.debezium.connector.jdbc.dialect;

import static io.debezium.connector.jdbc.JdbcSinkConnectorConfig.ColumnTypeResolutionMode;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
Expand Down Expand Up @@ -511,29 +509,15 @@ public String getTypeName(int jdbcType) {
// for us as we were relying on Hibernate for column type resolution, and now column types
// are being resolved differently. This code aims to retain the Debezium 2.x resolution
// functionality.
if (ColumnTypeResolutionMode.LEGACY.equals(connectorConfig.getColumnTypeResolutionMode())) {
switch (jdbcType) {
case Types.VARCHAR:
return getTypeName(Types.LONGVARCHAR);
case Types.NVARCHAR:
return getTypeName(Types.LONGNVARCHAR);
case Types.VARBINARY:
return getTypeName(Types.LONGVARBINARY);
default:
return ddlTypeRegistry.getTypeName(jdbcType, dialect);
}
}
else {
switch (jdbcType) {
case Types.VARCHAR:
return getTypeName(jdbcType, dialect.getMaxVarcharLength());
case Types.NVARCHAR:
return getTypeName(jdbcType, dialect.getMaxNVarcharLength());
case Types.VARBINARY:
return getTypeName(jdbcType, dialect.getMaxVarbinaryLength());
default:
return ddlTypeRegistry.getTypeName(jdbcType, dialect);
}
switch (jdbcType) {
case Types.VARCHAR:
return getTypeName(Types.LONGVARCHAR);
case Types.NVARCHAR:
return getTypeName(Types.LONGNVARCHAR);
case Types.VARBINARY:
return getTypeName(Types.LONGVARBINARY);
default:
return ddlTypeRegistry.getTypeName(jdbcType, dialect);
}
}

Expand Down

0 comments on commit f338f14

Please sign in to comment.