diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 00000000..eea6de55 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,9 @@ +#!/usr/bin/env groovy +/* + * Copyright [2021 - 2021] Confluent Inc. + */ +common { + slackChannel = '#connect-warn' + nodeLabel = 'docker-debian-jdk8' + downStreamValidate = false +} diff --git a/pom.xml b/pom.xml index 064c78a4..8f59f5ea 100644 --- a/pom.xml +++ b/pom.xml @@ -3,17 +3,21 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 + + io.confluent + connect-plugins-parent + 0.9.23 + com.github.splunk.kafka.connect splunk-kafka-connect - v2.0.5 + v2.0.5.1 splunk-kafka-connect UTF-8 1.8 1.8 - 1.8 4.13.2 5.3.2 5.3.2 @@ -32,7 +36,7 @@ com.fasterxml.jackson.core jackson-databind - 2.10.5.1 + 2.13.5 compile @@ -152,6 +156,12 @@ 3.7 compile + + org.mockito + mockito-core + 2.23.4 + test + @@ -202,19 +212,25 @@ org.apache.maven.plugins maven-checkstyle-plugin - 2.17 - - - validate - validate - - google_checks.xml - - - check - - - + + true + + + + com.github.spotbugs + spotbugs-maven-plugin + + true + + + + com.mycila + license-maven-plugin + + + ** + + diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index e0c5b98a..4f3c662b 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -274,6 +274,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification()) .setMaxConnectionPoolSizePerDestination(poolSizePerDest) .setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size()) + .setSocketTimeout(config.getSocketTimeout()) + .setConnectionTimeout(config.getConnectionTimeout()) + .setConnectionRequestTimeout(config.getConnectionRequestTimeout()) .build(); } @@ -286,6 +289,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { .setMaxConnectionPoolSizePerDestination(poolSizePerDest) .setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size()) .setSslContext(context) + .setSocketTimeout(config.getSocketTimeout()) + .setConnectionTimeout(config.getConnectionTimeout()) + .setConnectionRequestTimeout(config.getConnectionRequestTimeout()) .build(); } else { diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index a0931bc3..2ee3f527 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) { } if (resp.getText() == "Invalid data format") { - log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString()); + log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} batch={}", channel, channel.getIndexer(), batch.getUUID()); batch.commit(); List committedBatches = new ArrayList<>(); committedBatches.add(batch); @@ -316,7 +316,7 @@ private void findAndRemoveTimedoutBatches(Map batches, List batches) { - log.debug("received acked event batches={}", batches); + log.debug("received acked event batches={}", batches.size()); /* Loop all *assigned* partitions to find the lowest consecutive * HEC-commited offsets. A batch could contain events coming from a * variety of topic/partitions, and scanning those events coulb be diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 70865868..ee30ec4f 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -15,21 +15,36 @@ */ package com.splunk.kafka.connect; -import com.splunk.kafka.connect.VersionUtils; +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class SplunkSinkConnector extends SinkConnector { +public class SplunkSinkConnector extends SinkConnector { private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class); private Map taskConfig; + final private String COLLECTOR_ENDPOINT = "/services/collector"; + final private String HEALTH_CHECK_ENDPOINT = COLLECTOR_ENDPOINT + "/health"; @Override public void start(Map taskConfig) { @@ -66,4 +81,166 @@ public String version() { public ConfigDef config() { return SplunkSinkConnectorConfig.conf(); } + + @Override + public Config validate(Map connectorConfigs) { + Config config = super.validate(connectorConfigs); + SplunkSinkConnectorConfig connectorConfig; + try { + connectorConfig = new SplunkSinkConnectorConfig(connectorConfigs); + } catch (Exception e) { + log.warn("Validating configuration caught an exception", e); + return config; + } + + Map configValues = + config.configValues() + .stream() + .collect(Collectors.toMap( + ConfigValue::name, + Function.identity())); + + HecConfig hecConfig = connectorConfig.getHecConfig(); + try (CloseableHttpClient httpClient = createHttpClient(hecConfig)) { + + if (!validateCollector(httpClient, hecConfig, configValues)) { + return config; + } + + validateAccess(httpClient, hecConfig, configValues); + + } catch (IOException e) { + log.error("Configuration validation error", e); + recordErrors( + configValues, + "Configuration validation error: " + e.getMessage(), + SplunkSinkConnectorConfig.TOKEN_CONF + ); + } + + return config; + } + + /** + * We validate the collector by querying the HEC collector health. + * + * For a valid collector endpoint, this returns a HTTP 200 OK with the payload: + * {"text":"HEC is healthy","code":17} + * For an invalid hostname and other errors, the Java UnknownHostException and other similar + * Exceptions are thrown. + * + * @param httpClient HEC HTTP Client + * @param hecConfig The HEC configuration + * @param configValues The configuration ConfigValues + * @return Whether the validation was successful + */ + private boolean validateCollector( + CloseableHttpClient httpClient, + HecConfig hecConfig, + Map configValues + ) { + List uris = hecConfig.getUris(); + + Map connectionFailedCollectors = new LinkedHashMap<>(); + + for (String uri : uris) { + log.trace("Connecting to " + uri); + HttpGet request = new HttpGet(uri + HEALTH_CHECK_ENDPOINT); + + int status = -1; + try (CloseableHttpResponse response = httpClient.execute(request)) { + status = response.getStatusLine().getStatusCode(); + if (status == 200) { + log.trace("Connection succeeded for collector {}", uri); + } else { + log.trace("Connection failed for collector {}", uri); + connectionFailedCollectors.put(uri, response.getStatusLine().toString()); + } + } catch (Exception e) { + log.error("Caught exception while connecting", e); + connectionFailedCollectors.put(uri, e.getMessage()); + } + } + + if (!connectionFailedCollectors.isEmpty()) { + log.trace("Connection failed: " + connectionFailedCollectors); + recordErrors( + configValues, + "Connection Failed: " + connectionFailedCollectors, + SplunkSinkConnectorConfig.URI_CONF + ); + return false; + } + + return true; + } + + /** + * We validate access by posting an empty payload to the Splunk endpoint. + * + * For a valid endpoint and a valid token, this returns a HTTP 400 Bad Request with the + * payload: {"text":"No data","code":5} + * For a valid endpoint and an invalid token, this returns a HTTP 403 Forbidden with the + * payload: {"text":"Invalid token","code":4} + * For an invalid hostname and other errors, the Java UnknownHostException and other similar + * Exceptions are thrown. + * + * @param httpClient HEC HTTP Client + * @param hecConfig The HEC configuration + * @param configValues The configuration ConfigValues + */ + private void validateAccess( + CloseableHttpClient httpClient, + HecConfig hecConfig, + Map configValues + ) throws UnsupportedEncodingException { + List uris = hecConfig.getUris(); + + Map accessFailedCollectors = new LinkedHashMap<>(); + + for (String uri : uris) { + log.trace("Validating " + uri); + HttpPost request = new HttpPost(uri + COLLECTOR_ENDPOINT); + request.setEntity(new StringEntity("")); + + request.addHeader(HttpHeaders.AUTHORIZATION, String.format("Splunk %s", hecConfig.getToken())); + + int status = -1; + try (CloseableHttpResponse response = httpClient.execute(request)) { + status = response.getStatusLine().getStatusCode(); + if (status == 400 || status == 200) { + log.trace("Validation succeeded for collector {}", uri); + } else if (status == 403) { + log.trace("Invalid HEC token for collector {}", uri); + accessFailedCollectors.put(uri, response.getStatusLine().toString()); + } else { + log.trace("Validation failed for {}", uri); + accessFailedCollectors.put(uri, response.getStatusLine().toString()); + } + } catch (Exception e) { + log.error("Caught exception while validating", e); + accessFailedCollectors.put(uri, e.getMessage()); + } + } + + if (!accessFailedCollectors.isEmpty()) { + log.trace("Validation failed: " + accessFailedCollectors); + recordErrors( + configValues, + "Validation Failed: " + accessFailedCollectors, + SplunkSinkConnectorConfig.TOKEN_CONF + ); + } + } + + // Enables mocking during testing + CloseableHttpClient createHttpClient(final HecConfig config) { + return Hec.createHttpClient(config); + } + + void recordErrors(Map configValues, String message, String...keys) { + for (String key : keys) { + configValues.get(key).addErrorMessage(message); + } + } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 585e07ce..e85409b8 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -47,6 +47,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive"; static final String HEC_THREDS_CONF = "splunk.hec.threads"; static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds + static final String CONNECTION_TIMEOUT_CONF = "splunk.hec.connection.timeout"; // seconds + static final String CONNECTION_REQUEST_TIMEOUT_CONF = "splunk.hec.connection.request.timeout"; // seconds static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs"; static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression"; // Acknowledgement Parameters @@ -193,6 +195,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final boolean httpKeepAlive; final int numberOfThreads; final int socketTimeout; + final int connectionTimeout; + final int connectionRequestTimeout; final boolean validateCertificates; final boolean enableCompression; final int lbPollInterval; @@ -247,6 +251,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { flushWindow = getInt(FLUSH_WINDOW_CONF); totalHecChannels = getInt(TOTAL_HEC_CHANNEL_CONF); socketTimeout = getInt(SOCKET_TIMEOUT_CONF); + connectionTimeout = getInt(CONNECTION_TIMEOUT_CONF); + connectionRequestTimeout = getInt(CONNECTION_REQUEST_TIMEOUT_CONF); enrichments = parseEnrichments(getString(ENRICHMENT_CONF)); trackData = getBoolean(TRACK_DATA_CONF); useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF); @@ -288,6 +294,8 @@ public static ConfigDef conf() { .define(FLUSH_WINDOW_CONF, ConfigDef.Type.INT, 30, ConfigDef.Importance.LOW, FLUSH_WINDOW_DOC) .define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC) .define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) + .define(CONNECTION_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) + .define(CONNECTION_REQUEST_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) .define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC) .define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC) .define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC) @@ -315,6 +323,8 @@ public HecConfig getHecConfig() { HecConfig config = new HecConfig(Arrays.asList(splunkURI.split(",")), splunkToken); config.setDisableSSLCertVerification(!validateCertificates) .setSocketTimeout(socketTimeout) + .setConnectionTimeout(connectionTimeout) + .setConnectionRequestTimeout(connectionRequestTimeout) .setMaxHttpConnectionPerChannel(maxHttpConnPerChannel) .setTotalChannels(totalHecChannels) .setEventBatchTimeout(eventBatchTimeout) @@ -349,6 +359,8 @@ public String toString() { + "validateCertificates:" + validateCertificates + ", " + "trustStorePath:" + trustStorePath + ", " + "socketTimeout:" + socketTimeout + ", " + + "connectionTimeout:" + connectionTimeout + ", " + + "connectionRequestTimeout:" + connectionRequestTimeout + ", " + "eventBatchTimeout:" + eventBatchTimeout + ", " + "ackPollInterval:" + ackPollInterval + ", " + "ackPollThreads:" + ackPollThreads + ", " diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 82307fb5..407c2c97 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -65,8 +65,6 @@ public void start(Map taskConfig) { if(connectorConfig.flushWindow > 0) { flushWindow = connectorConfig.flushWindow * 1000; // Flush window set to user configured value (Multiply by 1000 as all the calculations are done in milliseconds) } - - log.info("kafka-connect-splunk task starts with config={}", connectorConfig); } @Override @@ -366,7 +364,6 @@ public void stop() { if (hec != null) { hec.close(); } - log.info("kafka-connect-splunk task ends with config={}", connectorConfig); } @Override @@ -404,7 +401,7 @@ private Event createHecEventFrom(final SinkRecord record) { event.setTied(record); event.addFields(connectorConfig.enrichments); } catch(Exception e) { - log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString()); + log.trace("event does not follow correct HEC pre-formatted format for record having offset: {}, topic Name: {} and topic Partition: {}", record.kafkaOffset(), record.topic(), record.kafkaPartition()); event = createHECEventNonFormatted(record); } } else { diff --git a/src/main/resources/version.properties b/src/main/resources/version.properties index bff7da4f..23ee6de7 100644 --- a/src/main/resources/version.properties +++ b/src/main/resources/version.properties @@ -1,3 +1,3 @@ githash= -gitbranch=release/2.0.x -gitversion=v2.0.5 +gitbranch=2.0.5.x +gitversion=v2.0.5.2 diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 666f3b37..f14673b2 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -15,17 +15,90 @@ */ package com.splunk.kafka.connect; +import org.apache.http.HttpStatus; +import org.apache.http.HttpVersion; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicStatusLine; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; -import java.util.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; class SplunkSinkConnecterTest { + final private String SPLUNK_URI_HOST1 = "https://www.host1.com:1111/"; + + @Mock + CloseableHttpResponse okHttpResponse; + + @Mock + CloseableHttpResponse badRequestHttpResponse; + + @Mock + CloseableHttpResponse forbiddenHttpResponse; + + @Mock + CloseableHttpResponse notFoundHttpResponse; + + @Spy + SplunkSinkConnector connector; + + @Spy + CloseableHttpClient httpClient; + + @BeforeEach + void setUp() { + MockitoAnnotations.initMocks(this); + + doReturn(httpClient) + .when(connector) + .createHttpClient(any()); + + when(okHttpResponse.getStatusLine()). + thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK")); + when(okHttpResponse.getEntity()) + .thenReturn(new StringEntity("{\"text\":\"HEC is healthy\",\"code\":17}", ContentType.APPLICATION_JSON)); + + when(badRequestHttpResponse.getStatusLine()). + thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_BAD_REQUEST, "Bad Request")); + when(badRequestHttpResponse.getEntity()) + .thenReturn(new StringEntity("{\"text\":\"No data\",\"code\":5}", ContentType.APPLICATION_JSON)); + + when(forbiddenHttpResponse.getStatusLine()) + .thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_FORBIDDEN, "Forbidden")); + when(forbiddenHttpResponse.getEntity()) + .thenReturn(new StringEntity("{\"text\":\"Invalid token\",\"code\":4}", ContentType.APPLICATION_JSON)); + + when(notFoundHttpResponse.getStatusLine()). + thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_NOT_FOUND, "Not Found")); + when(notFoundHttpResponse.getEntity()) + .thenReturn(new StringEntity("Not Found", ContentType.APPLICATION_XHTML_XML)); + } + @Test void startStop() { SinkConnector connector = new SplunkSinkConnector(); @@ -55,4 +128,155 @@ public void config() { ConfigDef config = connector.config(); Assert.assertNotNull(config); } -} \ No newline at end of file + + @Test + public void testValidationEmptyConfig() { + Config config = new SplunkSinkConnector().validate(new HashMap<>()); + + Map> errorMessages = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); + + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty()); + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty()); + } + + @Test + public void testValidationSuccess() throws IOException { + Map connectorConfig = getConnectorConfig(); + + doReturn(okHttpResponse) + .doReturn(badRequestHttpResponse) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + for (ConfigValue value : config.configValues()) { + assertTrue(value.errorMessages().isEmpty()); + } + } + + @Test + public void testValidationSuccessWithSuccessResponse() throws IOException { + Map connectorConfig = getConnectorConfig(); + + doReturn(okHttpResponse) + .doReturn(okHttpResponse) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + for (ConfigValue value : config.configValues()) { + assertTrue(value.errorMessages().isEmpty()); + } + } + + @Test + public void testConnectionFailure() throws IOException { + Map connectorConfig = getConnectorConfig(); + + doThrow(new UnknownHostException("Host not found")) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + + Map> errorMessages = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); + + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty()); + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty()); + + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1)); + } + + @Test + public void testAccessFailure() throws IOException { + Map connectorConfig = getConnectorConfig(); + + doReturn(okHttpResponse) + .doReturn(forbiddenHttpResponse) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + + Map> errorMessages = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); + + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty()); + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty()); + + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1)); + } + + @Test + public void testMultipleConnectionFailure() throws IOException { + Map connectorConfig = getConnectorConfig(); + String host2 = "https://www.host2.com:2222/"; + String host3 = "https://www.host3.com:3333/"; + connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF, + SPLUNK_URI_HOST1 + "," + host2 + "," + host3); + + doReturn(notFoundHttpResponse) + .doThrow(new UnknownHostException("Host not found")) + .doReturn(okHttpResponse) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + + Map> errorMessages = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); + + + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty()); + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty()); + + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1)); + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host2)); + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host3)); + } + + @Test + public void testMultipleAccessFailure() throws IOException { + Map connectorConfig = getConnectorConfig(); + String host2 = "https://www.host2.com:2222/"; + String host3 = "https://www.host3.com:3333/"; + connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF, + SPLUNK_URI_HOST1 + "," + host2 + "," + host3); + + doReturn(okHttpResponse) + .doReturn(okHttpResponse) + .doReturn(okHttpResponse) + .doReturn(badRequestHttpResponse) + .doThrow(new UnknownHostException("Host not found")) + .doReturn(notFoundHttpResponse) + .when(httpClient) + .execute(any()); + + Config config = connector.validate(connectorConfig); + + Map> errorMessages = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); + + + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty()); + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty()); + + assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1)); + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host2)); + assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host3)); + } + + private Map getConnectorConfig() { + Map connectorConfig = new HashMap<>(); + + connectorConfig.put(SinkConnector.TOPICS_CONFIG, "topic1"); + connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF, SPLUNK_URI_HOST1); + connectorConfig.put(SplunkSinkConnectorConfig.TOKEN_CONF, "token1"); + connectorConfig.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, "false"); + + return connectorConfig; + } + +}