Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-943288] Do not skip records when we're expecting the offset to be reset #729

Merged
merged 22 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public class TopicPartitionChannel {
* <li>If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to
* fetch offset from Snowflake again
* <li>If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the
* channel and try to fetch offset from Snowflake again
* channel and try to fetch offset from Snowflake again. Schematization purposefully fails
* the first buffer insert in order to alter the table, and then expects Kafka to resend
* data
* </ol>
*
* <p>In both cases above, we ask Kafka to send back offsets, strictly from offset number after
Expand All @@ -124,7 +126,7 @@ public class TopicPartitionChannel {
* <p>This boolean is used to indicate that we reset offset in kafka and we will only buffer once
* we see the offset which is one more than an offset present in Snowflake.
*/
private boolean isOffsetResetInKafka;
private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic?
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved

private final SnowflakeStreamingIngestClient streamingIngestClient;

Expand Down Expand Up @@ -364,7 +366,6 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
} finally {
bufferLock.unlock();
}

sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
// If we found reaching buffer size threshold or count based threshold, we will immediately
// flush (Insert them)
if (copiedStreamingBuffer != null) {
Expand All @@ -391,14 +392,19 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
*
* @param kafkaSinkRecord Record to check for above condition only in case of failures
* (isOffsetResetInKafka = true)
* @param currentProcessedOffset The current processed offset
* @return true if this record can be skipped to add into buffer, false otherwise.
*/
private boolean shouldIgnoreAddingRecordToBuffer(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
// Don't skip rows if there is no offset reset or there is no offset token information in the
SinkRecord kafkaSinkRecord, final long currentProcessedOffset) {
// Don't skip rows if there is no offset reset and there is no offset token information in the
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
// channel
if (!isOffsetResetInKafka
|| currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crux of the change is removing this OR

&& currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.debug(
"No offset registered in Snowflake and offset is not being reset, we can add this offset"
+ " to buffer for channel:{}",
currentProcessedOffset);
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,24 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() {
/* Only SFExceptions goes into fallback -> reopens channel, fetch offsetToken and throws Appropriate exception */
@Test
public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
final int noOfRecords = 5;
int expectedInsertRowsCount = 0;
int expectedOpenChannelCount = 0;
int expectedGetOffsetCount = 0;

// setup mocks to fail first insert and return two null snowflake offsets (open channel and
// failed insert) before succeeding
Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
.thenThrow(SF_EXCEPTION);

// get null from snowflake first time it is called and null for second time too since insert
// rows was failure
Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn(null);
.thenThrow(SF_EXCEPTION)
.thenReturn(new InsertValidationResponse());
Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
.thenReturn(null)
.thenReturn(null)
.thenReturn(Long.toString(noOfRecords - 1));

// create tpchannel
TopicPartitionChannel topicPartitionChannel =
new TopicPartitionChannel(
mockStreamingClient,
Expand All @@ -417,37 +426,47 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
mockTelemetryService);
final int noOfRecords = 5;
// Since record 0 was not able to ingest, all records in this batch will not be added into the
// buffer.
expectedOpenChannelCount++;
expectedGetOffsetCount++;

// verify initial mock counts after tpchannel creation
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();

// Test inserting record 0, which should fail to ingest so the other records are ignored
List<SinkRecord> records =
TestUtils.createJsonStringSinkRecords(0, noOfRecords, TOPIC, PARTITION);

records.forEach(topicPartitionChannel::insertRecordToBuffer);
expectedInsertRowsCount++;
expectedOpenChannelCount++;
expectedGetOffsetCount++;

Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords))
// verify mocks only tried ingesting once
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(noOfRecords + 1))
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords + 1))
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();

// Now, it should be successful
Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
.thenReturn(new InsertValidationResponse());

Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
.thenReturn(Long.toString(noOfRecords - 1));

// Retry the insert again, now everything should be ingested and the offset token should be
// noOfRecords-1
records.forEach(topicPartitionChannel::insertRecordToBuffer);
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords * 2))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));

Assert.assertEquals(noOfRecords - 1, topicPartitionChannel.fetchOffsetTokenWithRetry());
expectedInsertRowsCount += noOfRecords;
expectedGetOffsetCount++;

// verify mocks ingested each record
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();
}

@Test
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
"snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.flush.time": "60",
"buffer.count.records": "300",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
"snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.flush.time": "60",
"buffer.count.records": "300",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand Down
88 changes: 0 additions & 88 deletions test/test_suit/test_schema_evolution_drop_table.py

This file was deleted.

Loading
Loading