From e7dcbcbe32bcc7eab227ad7cc05da1f341647a6a Mon Sep 17 00:00:00 2001 From: Greg Jachimko Date: Mon, 15 Jul 2024 18:17:39 +0200 Subject: [PATCH 1/4] SNOW-1061855 Fix the E2E test for SchemaEvolutionDropTable with a single buffer --- .../DirectTopicPartitionChannel.java | 40 ++++++++++++------- ...s_correct_schema_evolution_drop_table.json | 2 +- ...hema_evolution_multi_topic_drop_table.json | 2 +- .../test_confluent_protobuf_protobuf.py | 4 +- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 0959ce984..ae8429ec1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -80,6 +80,11 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel { private final AtomicLong processedOffset = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + // This offset is would not be required for buffer-less channel, but we add it to keep buffered and non-buffered + // channel versions compatible. + private final AtomicLong latestConsumerOffset = + new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + // Indicates whether we need to skip and discard any leftover rows in the current batch, this // could happen when the channel gets invalidated and reset, then anything left in the buffer // should be skipped @@ -258,7 +263,7 @@ public DirectTopicPartitionChannel( metricsJmxReporter, this.offsetPersistedInSnowflake, this.processedOffset, - new AtomicLong(0)); + this.latestConsumerOffset); this.telemetryServiceV2.reportKafkaPartitionStart( new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime)); @@ -303,6 +308,11 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get(); final long currentProcessedOffset = this.processedOffset.get(); + // for backwards compatibility - set the consumer offset to be the first one received from kafka + if (latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + this.latestConsumerOffset.set(kafkaSinkRecord.kafkaOffset()); + } + // Reset the value if it's a new batch if (isFirstRowPerPartitionInBatch) { needToSkipCurrentBatch = false; @@ -689,19 +699,17 @@ private void resetChannelMetadataAfterRecovery( SnowflakeStreamingIngestChannel newChannel) { if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { LOGGER.info( - "{} Channel:{}, offset token is NULL", + "{} Channel:{}, offset token is NULL, will attempt to use offset managed by the connector" + + ", consumer offset: {}", streamingApiFallbackInvoker, - this.getChannelNameFormatV1()); + this.getChannelNameFormatV1(), + this.latestConsumerOffset.get()); } - final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L; - if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - LOGGER.info( - "There is no offset registered for {} channel in Snowflake. Stop recovering the channel" - + " metadata.", - this.getChannelNameFormatV1()); - return; - } + final long offsetToResetInKafka = + offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + ? latestConsumerOffset.get() + : offsetRecoveredFromSnowflake + 1L; // Reset Offset in kafka for this topic partition. this.sinkTaskContext.offset(this.topicPartition, offsetToResetInKafka); @@ -917,9 +925,8 @@ public long getProcessedOffset() { @Override @VisibleForTesting - @Deprecated public long getLatestConsumerOffset() { - return 0; + return this.latestConsumerOffset.get(); } @Override @@ -948,8 +955,11 @@ public SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() { } @Override - @Deprecated - public void setLatestConsumerOffset(long consumerOffset) {} + public void setLatestConsumerOffset(long consumerOffset) { + if (consumerOffset > this.latestConsumerOffset.get()) { + this.latestConsumerOffset.set(consumerOffset); + } + } /** * Converts the original kafka sink record into a Json Record. i.e key and values are converted diff --git a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json index 0c6523573..6c2ea6ea6 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json +++ b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json @@ -15,7 +15,7 @@ "snowflake.schema.name": "SNOWFLAKE_SCHEMA", "snowflake.role.name": "SNOWFLAKE_ROLE", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "snowflake.streaming.enable.single.buffer": "false", + "snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER", "snowflake.streaming.closeChannelsInParallel.enabled": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json index ad9fdc941..c3c516d72 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json +++ b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json @@ -15,7 +15,7 @@ "snowflake.schema.name": "SNOWFLAKE_SCHEMA", "snowflake.role.name": "SNOWFLAKE_ROLE", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "snowflake.streaming.enable.single.buffer": "false", + "snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER", "snowflake.streaming.closeChannelsInParallel.enabled": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", diff --git a/test/test_suit/test_confluent_protobuf_protobuf.py b/test/test_suit/test_confluent_protobuf_protobuf.py index fe168063f..aecfad7e6 100644 --- a/test/test_suit/test_confluent_protobuf_protobuf.py +++ b/test/test_suit/test_confluent_protobuf_protobuf.py @@ -29,8 +29,8 @@ def __init__(self, driver, nameSalt): self.sensor.uint64_val = (1 << 64) - 1 self.schema_registry_client = SchemaRegistryClient({'url': driver.schemaRegistryAddress}) - self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) - self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) + self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) + self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) producer_conf = { 'bootstrap.servers': driver.kafkaAddress, 'key.serializer': self.keyProtobufSerializer, From 530b919191c302aa25529880f94cc90fc5faf836 Mon Sep 17 00:00:00 2001 From: Greg Jachimko Date: Tue, 16 Jul 2024 08:01:54 +0200 Subject: [PATCH 2/4] SNOW-1061855 format code --- .../streaming/DirectTopicPartitionChannel.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index ae8429ec1..e6468159b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -80,10 +80,11 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel { private final AtomicLong processedOffset = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); - // This offset is would not be required for buffer-less channel, but we add it to keep buffered and non-buffered + // This offset is would not be required for buffer-less channel, but we add it to keep buffered + // and non-buffered // channel versions compatible. private final AtomicLong latestConsumerOffset = - new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); // Indicates whether we need to skip and discard any leftover rows in the current batch, this // could happen when the channel gets invalidated and reset, then anything left in the buffer @@ -699,15 +700,15 @@ private void resetChannelMetadataAfterRecovery( SnowflakeStreamingIngestChannel newChannel) { if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { LOGGER.info( - "{} Channel:{}, offset token is NULL, will attempt to use offset managed by the connector" + - ", consumer offset: {}", + "{} Channel:{}, offset token is NULL, will attempt to use offset managed by the connector" + + ", consumer offset: {}", streamingApiFallbackInvoker, this.getChannelNameFormatV1(), this.latestConsumerOffset.get()); } final long offsetToResetInKafka = - offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE ? latestConsumerOffset.get() : offsetRecoveredFromSnowflake + 1L; From 62209fdc019d3df2af9e7c4c3630b31c6900d64c Mon Sep 17 00:00:00 2001 From: Greg Jachimko Date: Tue, 16 Jul 2024 08:14:56 +0200 Subject: [PATCH 3/4] SNOW-1061855 reverted protobuf ctor change --- test/test_suit/test_confluent_protobuf_protobuf.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/test_suit/test_confluent_protobuf_protobuf.py b/test/test_suit/test_confluent_protobuf_protobuf.py index aecfad7e6..a34e5560d 100644 --- a/test/test_suit/test_confluent_protobuf_protobuf.py +++ b/test/test_suit/test_confluent_protobuf_protobuf.py @@ -29,8 +29,11 @@ def __init__(self, driver, nameSalt): self.sensor.uint64_val = (1 << 64) - 1 self.schema_registry_client = SchemaRegistryClient({'url': driver.schemaRegistryAddress}) - self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) - self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) + #uncomment for local tests + #self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) + #self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) + self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) + self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) producer_conf = { 'bootstrap.servers': driver.kafkaAddress, 'key.serializer': self.keyProtobufSerializer, From 3872b13f5b630dd25b552d9d2103ad870a648021 Mon Sep 17 00:00:00 2001 From: Greg Jachimko Date: Tue, 16 Jul 2024 10:48:48 +0200 Subject: [PATCH 4/4] NOSNOW: review fixes --- .../DirectTopicPartitionChannel.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index e6468159b..64cf6327d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -83,7 +83,7 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel { // This offset is would not be required for buffer-less channel, but we add it to keep buffered // and non-buffered // channel versions compatible. - private final AtomicLong latestConsumerOffset = + private final AtomicLong currentConsumerGroupOffset = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); // Indicates whether we need to skip and discard any leftover rows in the current batch, this @@ -264,7 +264,7 @@ public DirectTopicPartitionChannel( metricsJmxReporter, this.offsetPersistedInSnowflake, this.processedOffset, - this.latestConsumerOffset); + this.currentConsumerGroupOffset); this.telemetryServiceV2.reportKafkaPartitionStart( new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime)); @@ -310,8 +310,8 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit final long currentProcessedOffset = this.processedOffset.get(); // for backwards compatibility - set the consumer offset to be the first one received from kafka - if (latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - this.latestConsumerOffset.set(kafkaSinkRecord.kafkaOffset()); + if (currentConsumerGroupOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + this.currentConsumerGroupOffset.set(kafkaSinkRecord.kafkaOffset()); } // Reset the value if it's a new batch @@ -704,13 +704,16 @@ private void resetChannelMetadataAfterRecovery( + ", consumer offset: {}", streamingApiFallbackInvoker, this.getChannelNameFormatV1(), - this.latestConsumerOffset.get()); + this.currentConsumerGroupOffset.get()); } final long offsetToResetInKafka = offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE - ? latestConsumerOffset.get() + ? currentConsumerGroupOffset.get() : offsetRecoveredFromSnowflake + 1L; + if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + return; + } // Reset Offset in kafka for this topic partition. this.sinkTaskContext.offset(this.topicPartition, offsetToResetInKafka); @@ -927,7 +930,7 @@ public long getProcessedOffset() { @Override @VisibleForTesting public long getLatestConsumerOffset() { - return this.latestConsumerOffset.get(); + return this.currentConsumerGroupOffset.get(); } @Override @@ -957,8 +960,8 @@ public SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() { @Override public void setLatestConsumerOffset(long consumerOffset) { - if (consumerOffset > this.latestConsumerOffset.get()) { - this.latestConsumerOffset.set(consumerOffset); + if (consumerOffset > this.currentConsumerGroupOffset.get()) { + this.currentConsumerGroupOffset.set(consumerOffset); } }