From c60bd1daa49e35d72f542ea0961d1b9e3990d99d Mon Sep 17 00:00:00 2001 From: ramyogi7283 Date: Mon, 11 Sep 2023 14:31:03 -0400 Subject: [PATCH] UPSERT mode for DELETE operation not send external version --- .../io/confluent/connect/elasticsearch/DataConverter.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index 53085bc0a..8fba8f1ed 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -52,6 +52,8 @@ import java.util.List; import java.util.Map; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT; + public class DataConverter { private static final Logger log = LoggerFactory.getLogger(DataConverter.class); @@ -161,7 +163,11 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { // delete if (record.value() == null) { - return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record); + if (config.writeMethod().name().equals(UPSERT.name())) { + return new DeleteRequest(index).id(id); + } else { + return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record); + } } String payload = getPayload(record);