diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 0959ce984..64cf6327d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -80,6 +80,12 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel { private final AtomicLong processedOffset = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + // This offset is would not be required for buffer-less channel, but we add it to keep buffered + // and non-buffered + // channel versions compatible. + private final AtomicLong currentConsumerGroupOffset = + new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + // Indicates whether we need to skip and discard any leftover rows in the current batch, this // could happen when the channel gets invalidated and reset, then anything left in the buffer // should be skipped @@ -258,7 +264,7 @@ public DirectTopicPartitionChannel( metricsJmxReporter, this.offsetPersistedInSnowflake, this.processedOffset, - new AtomicLong(0)); + this.currentConsumerGroupOffset); this.telemetryServiceV2.reportKafkaPartitionStart( new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime)); @@ -303,6 +309,11 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get(); final long currentProcessedOffset = this.processedOffset.get(); + // for backwards compatibility - set the consumer offset to be the first one received from kafka + if (currentConsumerGroupOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + this.currentConsumerGroupOffset.set(kafkaSinkRecord.kafkaOffset()); + } + // Reset the value if it's a new batch if (isFirstRowPerPartitionInBatch) { needToSkipCurrentBatch = false; @@ -689,17 +700,18 @@ private void resetChannelMetadataAfterRecovery( SnowflakeStreamingIngestChannel newChannel) { if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { LOGGER.info( - "{} Channel:{}, offset token is NULL", + "{} Channel:{}, offset token is NULL, will attempt to use offset managed by the connector" + + ", consumer offset: {}", streamingApiFallbackInvoker, - this.getChannelNameFormatV1()); + this.getChannelNameFormatV1(), + this.currentConsumerGroupOffset.get()); } - final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L; + final long offsetToResetInKafka = + offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + ? currentConsumerGroupOffset.get() + : offsetRecoveredFromSnowflake + 1L; if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - LOGGER.info( - "There is no offset registered for {} channel in Snowflake. Stop recovering the channel" - + " metadata.", - this.getChannelNameFormatV1()); return; } @@ -917,9 +929,8 @@ public long getProcessedOffset() { @Override @VisibleForTesting - @Deprecated public long getLatestConsumerOffset() { - return 0; + return this.currentConsumerGroupOffset.get(); } @Override @@ -948,8 +959,11 @@ public SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() { } @Override - @Deprecated - public void setLatestConsumerOffset(long consumerOffset) {} + public void setLatestConsumerOffset(long consumerOffset) { + if (consumerOffset > this.currentConsumerGroupOffset.get()) { + this.currentConsumerGroupOffset.set(consumerOffset); + } + } /** * Converts the original kafka sink record into a Json Record. i.e key and values are converted diff --git a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json index 0c6523573..6c2ea6ea6 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json +++ b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json @@ -15,7 +15,7 @@ "snowflake.schema.name": "SNOWFLAKE_SCHEMA", "snowflake.role.name": "SNOWFLAKE_ROLE", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "snowflake.streaming.enable.single.buffer": "false", + "snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER", "snowflake.streaming.closeChannelsInParallel.enabled": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json index ad9fdc941..c3c516d72 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json +++ b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json @@ -15,7 +15,7 @@ "snowflake.schema.name": "SNOWFLAKE_SCHEMA", "snowflake.role.name": "SNOWFLAKE_ROLE", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "snowflake.streaming.enable.single.buffer": "false", + "snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER", "snowflake.streaming.closeChannelsInParallel.enabled": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", diff --git a/test/test_suit/test_confluent_protobuf_protobuf.py b/test/test_suit/test_confluent_protobuf_protobuf.py index fe168063f..a34e5560d 100644 --- a/test/test_suit/test_confluent_protobuf_protobuf.py +++ b/test/test_suit/test_confluent_protobuf_protobuf.py @@ -29,6 +29,9 @@ def __init__(self, driver, nameSalt): self.sensor.uint64_val = (1 << 64) - 1 self.schema_registry_client = SchemaRegistryClient({'url': driver.schemaRegistryAddress}) + #uncomment for local tests + #self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) + #self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True}) self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client) producer_conf = {