Skip to content

Commit

Permalink
autoformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 24, 2023
1 parent 39565cc commit b0556b0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import net.snowflake.ingest.internal.apache.commons.math3.analysis.function.Sin;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -270,7 +267,9 @@ public InsertValidationResponse insertRecord(SinkRecord kafkaSinkRecord) {
if (shouldInsertRecord(kafkaSinkRecord, currentOffsetPersistedInSnowflake)) {
try {
// try insert with fallback
finalResponse = insertRowsWithFallback(this.transformData(kafkaSinkRecord), kafkaSinkRecord.kafkaOffset());
finalResponse =
insertRowsWithFallback(
this.transformData(kafkaSinkRecord), kafkaSinkRecord.kafkaOffset());
LOGGER.info(
"Successfully called insertRows for channel:{}, insertResponseHasErrors:{}",
this.getChannelName(),
Expand All @@ -284,18 +283,20 @@ public InsertValidationResponse insertRecord(SinkRecord kafkaSinkRecord) {
// Suppressing the exception because other channels might still continue to ingest
LOGGER.warn(
String.format(
"[INSERT_RECORDS] Failure inserting rows for channel:%s",
this.getChannelName()),
"[INSERT_RECORDS] Failure inserting rows for channel:%s", this.getChannelName()),
ex);
}
}

return finalResponse;
}

public boolean shouldInsertRecord(SinkRecord kafkaSinkRecord, long currentOffsetPersistedInSnowflake) {
public boolean shouldInsertRecord(
SinkRecord kafkaSinkRecord, long currentOffsetPersistedInSnowflake) {
if (currentOffsetPersistedInSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.debug("Insert record because there is no offsetPersistedInSnowflake for channel:{}", this.getChannelName());
LOGGER.debug(
"Insert record because there is no offsetPersistedInSnowflake for channel:{}",
this.getChannelName());
return true;
}

Expand Down Expand Up @@ -457,8 +458,7 @@ private InsertValidationResponse insertRowsWithFallback(Map<String, Object> reco
.onFailedAttempt(
event ->
LOGGER.warn(
String.format(
"Failed Attempt to invoke the insertRows API"),
String.format("Failed Attempt to invoke the insertRows API"),
event.getLastException()))
.onFailure(
event ->
Expand All @@ -471,9 +471,7 @@ private InsertValidationResponse insertRowsWithFallback(Map<String, Object> reco
.build();

return Failsafe.with(reopenChannelFallbackExecutorForInsertRows)
.get(
new InsertRowsApiResponseSupplier(
this.channel, record, offset));
.get(new InsertRowsApiResponseSupplier(this.channel, record, offset));
}

/** Invokes the API given the channel and streaming Buffer. */
Expand All @@ -494,9 +492,7 @@ private InsertRowsApiResponseSupplier(

@Override
public InsertValidationResponse get() throws Throwable {
LOGGER.debug(
"Invoking insertRows API for channel:{}",
this.channel.getFullyQualifiedName());
LOGGER.debug("Invoking insertRows API for channel:{}", this.channel.getFullyQualifiedName());
return this.channel.insertRow(this.record, Long.toString(this.offset));
}
}
Expand Down Expand Up @@ -530,20 +526,15 @@ private void insertRowsFallbackSupplier(Throwable ex)
* @param insertErrors errors from validation response. (Only if it has errors)
*/
private void handleInsertRowsFailures(
List<InsertValidationResponse.InsertError> insertErrors,
SinkRecord sinkRecord) {
List<InsertValidationResponse.InsertError> insertErrors, SinkRecord sinkRecord) {
// evolve schema if needed
if (enableSchemaEvolution) {
InsertValidationResponse.InsertError insertError = insertErrors.get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
if (extraColNames != null || nonNullableColumns != null) {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
sinkRecord);
this.conn, this.channel.getTableName(), nonNullableColumns, extraColNames, sinkRecord);
}
}

Expand All @@ -562,8 +553,7 @@ private void handleInsertRowsFailures(
} else {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
// Map error row number to index in sinkRecords list.
this.kafkaRecordErrorReporter.reportError(sinkRecord,
insertError.getException());
this.kafkaRecordErrorReporter.reportError(sinkRecord, insertError.getException());
}
}
} else {
Expand Down Expand Up @@ -886,8 +876,6 @@ protected SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() {
return this.snowflakeTelemetryChannelStatus;
}



/**
* Enum representing which Streaming API is invoking the fallback supplier. ({@link
* #streamingApiFallbackSupplier(StreamingApiFallbackInvoker)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.TestUtils.createBigAvroRecords;
import static com.snowflake.kafka.connector.internal.TestUtils.createNativeJsonSinkRecords;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.MAX_GET_OFFSET_TOKEN_RETRIES;
import static org.mockito.ArgumentMatchers.eq;

import com.codahale.metrics.MetricRegistry;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.BufferThreshold;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
Expand Down Expand Up @@ -688,7 +685,6 @@ public void testInsertRows_ValidationResponseHasErrors_ErrorTolerance_ALL_LogEna

List<SinkRecord> records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION);


assert topicPartitionChannel.insertRecord(records.get(0)).hasErrors();

assert kafkaRecordErrorReporter.getReportedRecords().size() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,7 @@ public void testInvalidJmxReporter() {
// invalid jmx reporter should not error out
SnowflakeTelemetryChannelStatus snowflakeTelemetryChannelStatus =
new SnowflakeTelemetryChannelStatus(
tableName,
connectorName,
channelName,
1234,
true,
null,
new AtomicLong(-1));
tableName, connectorName, channelName, 1234, true, null, new AtomicLong(-1));
snowflakeTelemetryChannelStatus.tryUnregisterChannelJMXMetrics();
}
}

0 comments on commit b0556b0

Please sign in to comment.