-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SNOW-943288] Do not skip records when we're expecting the offset to be reset #729
Conversation
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
|
||
# send second batch that should flush | ||
value = [] | ||
for _ in range(self.flushRecordCount): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirmed that these tests repro the issue without the changes
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
I will take a look but can we postpone merging this fix until we have a new release? |
@@ -488,7 +488,8 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) { | |||
public void appendColumnsToTable(String tableName, Map<String, String> columnToType) { | |||
checkConnection(); | |||
InternalUtils.assertNotEmpty("tableName", tableName); | |||
StringBuilder appendColumnQuery = new StringBuilder("alter table identifier(?) add column if not exists "); | |||
StringBuilder appendColumnQuery = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
required for formatter to pass for some reason. not relevant to this PR, just a small format change
b254308
to
e0ea3f0
Compare
// Don't skip rows if there is no offset reset or there is no offset token information in the | ||
// channel | ||
if (!isOffsetResetInKafka | ||
|| currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crux of the change is removing this OR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, otherwise LGTM!
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
This is a behavior change, should we follow the new BCR release process for drivers/SDKs? cc: @sfc-gh-xhuang |
raise NonRetryableError("Column {} was not created".format(columnName)) | ||
|
||
res = self.driver.snowflake_conn.cursor().execute( | ||
"SELECT count(*) FROM {}".format(self.table)).fetchone()[0] | ||
if res != self.recordNum * len(self.topics): | ||
print("Number of record expected: {}, got: {}".format(self.recordNum * len(self.topics), res)) | ||
if res != len(self.topics) * self.recordNum: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great test, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Thanks for the work.. Talked offline and I think this is a good short term solution. Please see if we can re-add those tests as Toby mentioned. For long term:
- We do need to remove buffer in KC.
- if we cant remove bufffer from KC, we should have an interface between KC buffer and the logic which returns the insertRowsResponse. Here we can discard the batch only after the bad row and not necessarily entire batch. Of course this needs to be thought through since we are talking about discarding the incoming batch from Kafka which spans across multiple partitions. (but you can route those records to individual partitions, and discard partition batches) but I do feel this is a good short term solution not relying on removal of buffer..
Let's discuss more on whether this is a behavior change or a bug fix but following the BCR process for drivers to simple enough too. |
https://snowflakecomputing.atlassian.net/browse/SNOW-943288
Do not skip files when there is no kafka offset reset, remove the processedOffset check
Context
Customer noticed first 500 rows are missing when using schema evolution. This is because we do not reset the kafka offset correctly when we fail in the middle of a KC batch.
Current Behavior, with buffer flush size 500:
a. 464 (offset 36-500) records inserted into buffer and triggers flush, schema alter column, reopen channel and reset kafka offset
b. Remaining 36 records (offset 500-535) inserted into buffer <- These records should be skipped
So this PR alters the logic to skip the remaining 36 records (offset 500-535)