From 50f145b5fe9e8f7842344495e8b3bbf455cc571a Mon Sep 17 00:00:00 2001 From: Nicholas Cole Date: Mon, 11 Mar 2024 15:00:59 -0500 Subject: [PATCH] Porting from https://github.com/confluentinc/kafka-connect-elasticsearch/pull/759 --- .../connect/elasticsearch/DataConverter.java | 17 +++++ .../ElasticsearchSinkConnectorConfig.java | 71 ++++++++++++------ .../elasticsearch/util/ScriptParser.java | 46 ++++++++++++ .../validator/ScriptValidator.java | 74 +++++++++++++++++++ .../elasticsearch/DataConverterTest.java | 28 +++++++ .../connect/elasticsearch/ValidatorTest.java | 37 ++++++++++ 6 files changed, 252 insertions(+), 21 deletions(-) create mode 100644 src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index c5599cc31..3cd83e2f9 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import io.confluent.connect.elasticsearch.util.ScriptParser; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -44,6 +45,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.script.Script; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,6 +188,21 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), record ); + case SCRIPTED_UPSERT: + Script script = null; + + try { + script = ScriptParser.parseScript(config.getScript()); + } catch (JsonProcessingException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } + + return new UpdateRequest(index, id) + .doc(payload, XContentType.JSON) + .upsert(payload, XContentType.JSON) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) + .script(script) + .scriptedUpsert(true); default: return null; // shouldn't happen } diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index c9df10d9e..89dc00423 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.concurrent.TimeUnit; + +import io.confluent.connect.elasticsearch.validator.ScriptValidator; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -277,6 +279,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { ); private static final String WRITE_METHOD_DISPLAY = "Write Method"; private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name(); + + public static final String UPSERT_SCRIPT_CONFIG = "upsert.script"; + + private static final String UPSERT_SCRIPT_DOC = "Script used for" + + " upserting data to Elasticsearch. This script allows for" + + " customizable behavior upon upserting a document. Please refer to" + + " Elasticsearch scripted upsert documentation"; + + private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script"; + private static final String UPSERT_SCRIPT_DEFAULT = ""; public static final String LOG_SENSITIVE_DATA_CONFIG = "log.sensitive.data"; private static final String LOG_SENSITIVE_DATA_DISPLAY = "Log Sensitive data"; private static final String LOG_SENSITIVE_DATA_DOC = "If true, logs sensitive data " @@ -408,7 +420,8 @@ public enum SecurityProtocol { public enum WriteMethod { INSERT, - UPSERT + UPSERT, + SCRIPTED_UPSERT } protected static ConfigDef baseConfigDef() { @@ -622,8 +635,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_KEY_DISPLAY - ).define( + IGNORE_KEY_DISPLAY) + .define( IGNORE_SCHEMA_CONFIG, Type.BOOLEAN, IGNORE_SCHEMA_DEFAULT, @@ -632,8 +645,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_SCHEMA_DISPLAY - ).define( + IGNORE_SCHEMA_DISPLAY) + .define( COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, COMPACT_MAP_ENTRIES_DEFAULT, @@ -642,8 +655,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - COMPACT_MAP_ENTRIES_DISPLAY - ).define( + COMPACT_MAP_ENTRIES_DISPLAY) + .define( IGNORE_KEY_TOPICS_CONFIG, Type.LIST, IGNORE_KEY_TOPICS_DEFAULT, @@ -652,8 +665,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_KEY_TOPICS_DISPLAY - ).define( + IGNORE_KEY_TOPICS_DISPLAY) + .define( IGNORE_SCHEMA_TOPICS_CONFIG, Type.LIST, IGNORE_SCHEMA_TOPICS_DEFAULT, @@ -662,8 +675,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_SCHEMA_TOPICS_DISPLAY - ).define( + IGNORE_SCHEMA_TOPICS_DISPLAY) + .define( DROP_INVALID_MESSAGE_CONFIG, Type.BOOLEAN, DROP_INVALID_MESSAGE_DEFAULT, @@ -672,8 +685,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - DROP_INVALID_MESSAGE_DISPLAY - ).define( + DROP_INVALID_MESSAGE_DISPLAY) + .define( BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(), @@ -684,8 +697,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_NULL_VALUES_DISPLAY, - new EnumRecommender<>(BehaviorOnNullValues.class) - ).define( + new EnumRecommender<>(BehaviorOnNullValues.class)) + .define( BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, Type.STRING, BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(), @@ -696,8 +709,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY, - new EnumRecommender<>(BehaviorOnMalformedDoc.class) - ).define( + new EnumRecommender<>(BehaviorOnMalformedDoc.class)) + .define( EXTERNAL_VERSION_HEADER_CONFIG, Type.STRING, EXTERNAL_VERSION_HEADER_DEFAULT, @@ -706,8 +719,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - EXTERNAL_VERSION_HEADER_DISPLAY - ).define( + EXTERNAL_VERSION_HEADER_DISPLAY) + .define( WRITE_METHOD_CONFIG, Type.STRING, WRITE_METHOD_DEFAULT, @@ -718,8 +731,20 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, WRITE_METHOD_DISPLAY, - new EnumRecommender<>(WriteMethod.class) - ); + new EnumRecommender<>(WriteMethod.class)) + .define( + UPSERT_SCRIPT_CONFIG, + Type.STRING, + null, + new ScriptValidator(), + Importance.LOW, + UPSERT_SCRIPT_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + UPSERT_SCRIPT_DISPLAY, + new ScriptValidator()); + ; } private static void addProxyConfigs(ConfigDef configDef) { @@ -1078,6 +1103,10 @@ public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } + public String getScript() { + return getString(UPSERT_SCRIPT_CONFIG); + } + private static class DataStreamDatasetValidator implements Validator { @Override diff --git a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java new file mode 100644 index 000000000..f173f15b7 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.elasticsearch.script.Script; + +import java.util.Map; + +public class ScriptParser { + + public static Script parseScript(String scriptJson) throws JsonProcessingException { + + Map map = ScriptParser.parseSchemaStringAsJson(scriptJson); + + return Script.parse(map); + } + + private static Map parseSchemaStringAsJson(String scriptJson) + throws JsonProcessingException { + + ObjectMapper objectMapper = new ObjectMapper(); + + Map scriptConverted; + + scriptConverted = objectMapper.readValue( + scriptJson, new TypeReference>(){}); + + return scriptConverted; + } +} diff --git a/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java new file mode 100644 index 000000000..760beab4b --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.validator; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.confluent.connect.elasticsearch.util.ScriptParser; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.elasticsearch.script.Script; + +public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender { + + @Override + @SuppressWarnings("unchecked") + public void ensureValid(String name, Object value) { + + if (value == null) { + return; + } + + String script = (String) value; + + try { + Script parsedScript = ScriptParser.parseScript(script); + + if (parsedScript.getIdOrCode() == null) { + throw new ConfigException(name, script, "The specified script is missing code"); + } else if (parsedScript.getLang() == null) { + throw new ConfigException(name, script, "The specified script is missing lang"); + } + + } catch (JsonProcessingException jsonProcessingException) { + throw new ConfigException( + name, script, "The specified script is not a valid Elasticsearch painless script"); + } + } + + @Override + public String toString() { + return "A valid script that is able to be parsed"; + } + + @Override + public List validValues(String name, Map parsedConfig) { + if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) { + return new ArrayList<>(); + } + return null; + } + + @Override + public boolean visible(String name, Map parsedConfig) { + return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name()); + } +} diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java index 03e1b092b..132b0c62d 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.VersionType; import org.junit.Before; import org.junit.Test; @@ -39,8 +40,10 @@ import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DataConverterTest { @@ -572,6 +575,31 @@ public void testDoNotAddExternalVersioningIfDataStream() { assertEquals(VersionType.INTERNAL, actualRecord.versionType()); } + @Test + public void upsertScript() { + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + + UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); + + assertNotNull(actualRecord.script()); + assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode()); + assertEquals("painless", actualRecord.script().getLang()); + assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0)); + assertTrue(actualRecord.scriptedUpsert()); + assertNotNull(actualRecord.doc()); + } + private void configureDataStream() { props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG, "logs"); props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG, "dataset"); diff --git a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java index 3dba9ec8a..ae98cb573 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java @@ -39,6 +39,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -51,8 +52,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.SslConfigs; @@ -106,6 +109,40 @@ public void testValidUpsertDeleteOnDefaultConfig() { assertNoErrors(result); } + @Test + public void testValidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertTrue(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + + @Test + public void testInvalidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(WRITE_METHOD_CONFIG, "upsert"); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertFalse(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + @Test public void testInvalidCredentials() { props.put(CONNECTION_USERNAME_CONFIG, "username");