Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 434: Add connector level proxy configurations #436

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ Use the below schema to configure Splunk Connect for Kafka
"splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
"splunk.hec.ssl.validate.certs": "<true|false>",
"splunk.hec.http.keepalive": "<true|false>",
"splunk.hec.http.proxy.host": "<the http proxy host name>",
"splunk.hec.http.proxy.port": "<the http proxy port number>",
"splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>",
"splunk.hec.total.channels": "<total number of channels>",
"splunk.hec.max.batch.size": "<max number of kafka records post in one batch>",
Expand Down Expand Up @@ -167,6 +169,8 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
| `splunk.hec.http.proxy.host` | This setting is the http proxy server hostname. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `""` |
| `splunk.hec.http.proxy.port` | This setting is the http proxy server port. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `0` |
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
| `splunk.hec.total.channels` | Controls the total channels created to perform HEC event POSTs. See the Load balancer section for more details. |`2`|
| `splunk.hec.max.batch.size` | Maximum batch size when posting events to Splunk. The size is the actual number of Kafka events, and not byte size. |`500`|
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification())
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setHttpProxyHost(config.getHttpProxyHost())
.setHttpProxyPort(config.getHttpProxyPort())
.build();
}

Expand All @@ -295,6 +297,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSslContext(context)
.setHttpProxyHost(config.getHttpProxyHost())
.setHttpProxyPort(config.getHttpProxyPort())
.build();
}
else {
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class HecConfig {
private String kerberosKeytabPath;
private int concurrentHecQueueCapacity = 100;
private Boolean autoExtractTimestamp;
private String httpProxyHost;
private int httpProxyPort = 0;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand All @@ -63,6 +65,14 @@ public boolean getHttpKeepAlive() {
return httpKeepAlive;
}

public String getHttpProxyHost() {
return httpProxyHost;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public int getSocketTimeout() {
return socketTimeout;
}
Expand Down Expand Up @@ -127,6 +137,16 @@ public HecConfig setHttpKeepAlive(boolean keepAlive) {
return this;
}

public HecConfig setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
return this;
}

public HecConfig setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
return this;
}

public HecConfig setSocketTimeout(int timeout /*seconds*/) {
socketTimeout = timeout;
return this;
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/com/splunk/hecclient/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
Expand Down Expand Up @@ -50,6 +53,8 @@ public final class HttpClientBuilder {
private int socketTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
private String httpProxyHost;
private int httpProxyPort;
private SSLContext sslContext = null;

public HttpClientBuilder setMaxConnectionPoolSizePerDestination(int connections) {
Expand Down Expand Up @@ -77,6 +82,16 @@ public HttpClientBuilder setDisableSSLCertVerification(boolean disableVerificati
return this;
}

public HttpClientBuilder setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
return this;
}

public HttpClientBuilder setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
return this;
}

public HttpClientBuilder setSslContext(SSLContext context) {
this.sslContext = context;
return this;
Expand All @@ -88,9 +103,12 @@ public CloseableHttpClient build() {
.setSndBufSize(socketSendBufferSize)
.setSoTimeout(socketTimeout * 1000)
.build();
RequestConfig requestConfig = RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD)
.build();
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD);
if (StringUtils.isNotEmpty(this.httpProxyHost) && this.httpProxyPort != 0) {
requestConfigBuilder.setProxy(new HttpHost(this.httpProxyHost, this.httpProxyPort));
}
RequestConfig requestConfig = requestConfigBuilder.build();

return HttpClients.custom()
.useSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel";
static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count
static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive";
static final String HTTP_PROXY_HOST_CONF = "splunk.hec.http.proxy.host";
static final String HTTP_PROXY_PORT_CONF = "splunk.hec.http.proxy.port";
static final String HEC_THREDS_CONF = "splunk.hec.threads";
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
Expand Down Expand Up @@ -128,6 +130,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
+ "Kafka events not the byte size. By default, this is set to 100.";
static final String HTTP_KEEPALIVE_DOC = "Valid settings are true or false. Enables or disables HTTP connection "
+ "keep-alive. By default, this is set to true";
static final String HTTP_PROXY_HOST_DOC = "This setting is the http proxy server hostname. Configure it to use connector "
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
static final String HTTP_PROXY_PORT_DOC = "This setting is the http proxy server port. Configure it to use connector "
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
static final String HEC_THREADS_DOC = "Controls how many threads are spawned to do data injection via HEC in a single "
+ "connector task. By default, this is set to 1.";
static final String SOCKET_TIMEOUT_DOC = "Max duration in seconds to read / write data to network before internal TCP "
Expand Down Expand Up @@ -225,6 +231,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final int maxHttpConnPerChannel;
final int maxBatchSize;
final boolean httpKeepAlive;
final String httpProxyHost;
final int httpProxyPort;
final int numberOfThreads;
final int socketTimeout;
final boolean validateCertificates;
Expand Down Expand Up @@ -280,6 +288,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
sourcetypes = getString(SOURCETYPE_CONF);
sources = getString(SOURCE_CONF);
httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF);
httpProxyHost = getString(HTTP_PROXY_HOST_CONF);
httpProxyPort = getInt(HTTP_PROXY_PORT_CONF);
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
Expand Down Expand Up @@ -330,7 +340,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
}


public static ConfigDef conf() {
return new ConfigDef()
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)
Expand All @@ -341,6 +351,8 @@ public static ConfigDef conf() {
.define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC)
.define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC)
.define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC)
.define(HTTP_PROXY_HOST_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, HTTP_PROXY_HOST_DOC)
.define(HTTP_PROXY_PORT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.HIGH, HTTP_PROXY_PORT_DOC)
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
Expand Down Expand Up @@ -392,6 +404,8 @@ public HecConfig getHecConfig() {
.setTotalChannels(totalHecChannels)
.setEventBatchTimeout(eventBatchTimeout)
.setHttpKeepAlive(httpKeepAlive)
.setHttpProxyHost(httpProxyHost)
.setHttpProxyPort(httpProxyPort)
.setAckPollInterval(ackPollInterval)
.setlbPollInterval(lbPollInterval)
.setAckPollThreads(ackPollThreads)
Expand Down Expand Up @@ -424,6 +438,8 @@ public String toString() {
+ "headerSupport:" + headerSupport + ", "
+ "headerCustom:" + headerCustom + ", "
+ "httpKeepAlive:" + httpKeepAlive + ", "
+ "httpProxyHost:" + httpProxyHost + ", "
+ "httpProxyPort:" + httpProxyPort + ", "
+ "validateCertificates:" + validateCertificates + ", "
+ "trustStorePath:" + trustStorePath + ", "
+ "trustStoreType:" + trustStoreType + ", "
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/com/splunk/hecclient/HecConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void getterSetter() {
config.setAckPollInterval(1)
.setDisableSSLCertVerification(true)
.setHttpKeepAlive(false)
.setHttpProxyHost("test.host")
.setHttpProxyPort(8080)
.setSocketSendBufferSize(2)
.setSocketTimeout(3)
.setMaxHttpConnectionPerChannel(4)
Expand All @@ -60,6 +62,8 @@ public void getterSetter() {
Assert.assertEquals(5, config.getTotalChannels());
Assert.assertEquals(6, config.getAckPollThreads());
Assert.assertEquals(7, config.getEventBatchTimeout());
Assert.assertEquals("test.host", config.getHttpProxyHost());
Assert.assertEquals(8080, config.getHttpProxyPort());
Assert.assertEquals("test", config.getTrustStorePath());
Assert.assertEquals("PKCS12", config.getTrustStoreType());
Assert.assertEquals("pass", config.getTrustStorePassword());
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public void buildSecureDefault() {
.build();
Assert.assertNotNull(client);
}

@Test
public void buildHttpProxy() {
HttpClientBuilder builder = new HttpClientBuilder();
CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyHost("rest.host")
.setHttpProxyPort(8080)
.build();
Assert.assertNotNull(client);
builder = new HttpClientBuilder();
client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyPort(8080)
.build();
Assert.assertNotNull(client);
builder = new HttpClientBuilder();
client = builder.setMaxConnectionPoolSizePerDestination(1)
.setHttpProxyHost("rest.host")
.build();
Assert.assertNotNull(client);
}

@Test
public void buildSecureCustomKeystore() {
HttpClientBuilder builder = new HttpClientBuilder();
Expand Down
22 changes: 21 additions & 1 deletion src/test/java/com/splunk/kafka/connect/ConfigProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ConfigProfile {
private String sourcetypes;
private String sources;
private boolean httpKeepAlive;
private String httpProxyHost;
private int httpProxyPort;
private boolean validateCertificates;
private boolean hasTrustStorePath;
private String trustStorePath;
Expand Down Expand Up @@ -75,6 +77,8 @@ public ConfigProfile buildProfileDefault() {
this.sourcetypes = "";
this.sources = "";
this.httpKeepAlive = true;
this.httpProxyHost = "proxy.host";
this.httpProxyPort = 8080;
this.validateCertificates = true;
this.hasTrustStorePath = true;
this.trustStorePath = "./src/test/resources/keystoretest.jks";
Expand Down Expand Up @@ -311,6 +315,22 @@ public void setHttpKeepAlive(boolean httpKeepAlive) {
this.httpKeepAlive = httpKeepAlive;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public String getHttpProxyHost() {
return httpProxyHost;
}

public void setHttpProxyHost(final String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public void setHttpProxyPort(final int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public boolean isValidateCertificates() {
return validateCertificates;
}
Expand Down Expand Up @@ -472,6 +492,6 @@ public void setHeaderHost(String headerHost) {
}

@Override public String toString() {
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", httpProxyHost=" + httpProxyHost + ", httpProxyPort=" + httpProxyPort + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", " + "trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,17 @@ public void testSpecialCharLineBreaker() {
Assert.assertEquals("\t", connectorConfig.lineBreaker);
}

@Test
public void testHttpProxy() {
UnitUtil uu = new UnitUtil(0);
Map<String, String> taskConfig = uu.createTaskConfig();
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
HecConfig config = connectorConfig.getHecConfig();

Assert.assertEquals("proxy.host", config.getHttpProxyHost());
Assert.assertEquals(8080, config.getHttpProxyPort());
}

@Test
public void toStr() {
UnitUtil uu = new UnitUtil(0);
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/splunk/kafka/connect/UnitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public Map<String, String> createTaskConfig() {
config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, configProfile.getSourcetypes());
config.put(SplunkSinkConnectorConfig.SOURCE_CONF, configProfile.getSources());
config.put(SplunkSinkConnectorConfig.HTTP_KEEPALIVE_CONF, String.valueOf(configProfile.isHttpKeepAlive()));
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_HOST_CONF, configProfile.getHttpProxyHost());
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_PORT_CONF, String.valueOf(configProfile.getHttpProxyPort()));
config.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, String.valueOf(configProfile.isValidateCertificates()));

if(configProfile.getTrustStorePath() != null ) {
Expand Down
Loading
Loading