Skip to content

Commit

Permalink
Fix IndexOutOfBoundException thrown when offsets are not continous du…
Browse files Browse the repository at this point in the history
…ring schema-evolution (#1043)
  • Loading branch information
sfc-gh-mbobowski authored Jan 16, 2025
1 parent d02c39a commit 0244792
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,10 @@ public InsertRowsResponse get() throws Throwable {
"Invoking insertRows API for channel:{}, streamingBuffer:{}",
this.channel.getFullyQualifiedName(),
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
Pair<List<Map<String, Object>>, List<SinkRecord>> recordsAndOriginalSinkRecords =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
List<Map<String, Object>> records = recordsAndOriginalSinkRecords.getKey();
List<SinkRecord> originalSinkRecords = recordsAndOriginalSinkRecords.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
Expand All @@ -658,16 +658,19 @@ public InsertRowsResponse get() throws Throwable {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset()));
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(
insertError, this.channel.getTableName());

// TODO : originalSinkRecordIdx can be replaced by idx
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();
originalSinkRecord.kafkaOffset() - this.insertRowsStreamingBuffer.getFirstOffset();

if (!schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
InsertValidationResponse.InsertError newInsertError =
Expand All @@ -683,9 +686,7 @@ public InsertRowsResponse get() throws Throwable {
} else {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
channel.getTableSchema());
schemaEvolutionTargetItems, originalSinkRecord, channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down Expand Up @@ -1282,7 +1283,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
* before calling insertRows API.
*/
@VisibleForTesting
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<Long>>> {
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<SinkRecord>>> {
// Records coming from Kafka
private final List<SinkRecord> sinkRecords;

Expand All @@ -1307,22 +1308,21 @@ public void insert(SinkRecord kafkaSinkRecord) {
}

/**
* Get all rows and their offsets. Each map corresponds to one row whose keys are column names
* and values are corresponding data in that column.
* Get all rows and corresponding SinkRecords. Each map corresponds to one row whose keys are
* column names and values are corresponding data in that column.
*
* <p>This goes over through all buffered kafka records and transforms into JsonSchema and
* JsonNode Check {@link #handleNativeRecord(SinkRecord, boolean)}
*
* @return A pair that contains the records and their corresponding offsets
* @return A pair that contains the records and their corresponding original sinkRecords
*/
@Override
public Pair<List<Map<String, Object>>, List<Long>> getData() {
public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
final List<Map<String, Object>> records = new ArrayList<>();
final List<Long> offsets = new ArrayList<>();
final List<SinkRecord> filteredOriginalSinkRecords = new ArrayList<>();

for (SinkRecord kafkaSinkRecord : sinkRecords) {
SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);

// broken record
if (isRecordBroken(snowflakeRecord)) {
// check for error tolerance and log tolerance values
Expand All @@ -1338,14 +1338,13 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
&& snowflakeRecord.timestampType() != NO_TIMESTAMP_TYPE) {
// TODO:SNOW-529751 telemetry
}

// Convert this records into Json Schema which has content and metadata, add it to DLQ if
// there is an exception
try {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
offsets.add(snowflakeRecord.kafkaOffset());
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand All @@ -1371,7 +1370,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
getBufferSizeBytes(),
getFirstOffset(),
getLastOffset());
return new Pair<>(records, offsets);
return new Pair<>(records, filteredOriginalSinkRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,22 +722,48 @@ public static List<SinkRecord> createJsonStringSinkRecords(
return records;
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
return createJsonRecords(
startOffset,
noOfRecords,
topicName,
partitionNo,
null,
Collections.singletonMap("schemas.enable", Boolean.toString(false)));
}

/* Generate (noOfRecords - startOffset) for a given topic and partition. */
public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
return createJsonRecords(
startOffset,
noOfRecords,
topicName,
partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true)));
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -510,24 +507,8 @@ public void testPartialBatchChannelInvalidationIngestion_schematization(boolean
final long secondBatchCount = 500;

// create 18 blank records that do not kick off schematization
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);

List<SinkRecord> firstBatch = new ArrayList<>();
for (int i = 0; i < firstBatchCount; i++) {
firstBatch.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}
List<SinkRecord> firstBatch =
TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION);

service.insert(firstBatch);

Expand Down Expand Up @@ -764,48 +745,30 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setRecordNumber(4)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// Insert another two records with offset gap that requires evolution: 3, 4
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION);
gapRecords.remove(0);
service.insert(gapRecords);
// Insert another two records with offset gap that requires evolution: 300, 301
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION);

List<SinkRecord> mergedList = new ArrayList<>(blankRecords);
mergedList.addAll(gapRecords);
// mergedList' offsets -> [0, 1, 300, 301]
service.insert(mergedList);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
if (withSchematization) {
service.insert(gapRecords);
service.insert(mergedList);
}

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 5, 20, 5);
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 20, 5);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
Expand Down

0 comments on commit 0244792

Please sign in to comment.