Skip to content

Commit

Permalink
Cleanup and bump to the latest version of Kafka. (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder authored Apr 21, 2017
1 parent b4123c0 commit 9c7a992
Show file tree
Hide file tree
Showing 24 changed files with 258 additions and 322 deletions.
89 changes: 61 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,50 @@
This connector allows Kafka Connect to emulate a [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M).
This connector support receiving data and writing data to Splunk.

# Source Connector
# Configuration

The Splunk Source connector allows emulates a [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M) to allow
application that normally log to Splunk to instead write to Kafka. The goal of this plugin is to make the change nearly
transparent to the user. This plugin currently has support for [X-Forwarded-For](https://en.wikipedia.org/wiki/X-Forwarded-For) so
it will sit behind a load balancer nicely.
## SplunkHttpSinkConnector

## Configuration
The Sink Connector will transform data from a Kafka topic into a batch of json messages that will be written via HTTP to a configured [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M).

```properties
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.splunk.SplunkHttpSinkConnector

# Set these required values
splunk.remote.host=
splunk.auth.token=
```

| Name | Description | Type | Default | Valid Values | Importance |
|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|----------|----------|--------------|------------|
| splunk.auth.token | The authorization token to use when writing data to splunk. | password | | | high |
| splunk.remote.host | The hostname of the remote splunk host to write data do. | string | | | high |
| splunk.ssl.enabled | Flag to determine if the connection to splunk should be over ssl. | boolean | true | | high |
| splunk.ssl.trust.store.password | Password for the trust store. | password | [hidden] | | high |
| splunk.ssl.trust.store.path | Path on the local disk to the certificate trust store. | string | "" | | high |
| splunk.remote.port | Port on the remote splunk server to write to. | int | 8088 | | medium |
| splunk.ssl.validate.certs | Flag to determine if ssl connections should validate the certificateof the remote host. | boolean | true | | medium |
| splunk.connect.timeout.ms | The maximum amount of time for a connection to be established. | int | 20000 | | low |
| splunk.curl.logging.enabled | Flag to determine if requests to Splunk should be logged in curl form. This will output a curl command to replicate the call to Splunk. | boolean | false | | low |
| splunk.read.timeout.ms | Sets the timeout in milliseconds to read data from an established connection or 0 for an infinite timeout. | int | 30000 | | low |

## SplunkHttpSourceConnector

The Splunk Source connector allows emulates a [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M) to allow application that normally log to Splunk to instead write to Kafka. The goal of this plugin is to make the change nearly transparent to the user. This plugin currently has support for [X-Forwarded-For](https://en.wikipedia.org/wiki/X-Forwarded-For) so it will sit behind a load balancer nicely.

```properties
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.splunk.SplunkHttpSourceConnector

# Set these required values
splunk.ssl.key.store.password=
splunk.collector.index.default=
splunk.ssl.key.store.path=
kafka.topic=
```

| Name | Description | Type | Default | Valid Values | Importance |
|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------|--------------|------------|
Expand All @@ -24,33 +60,30 @@ it will sit behind a load balancer nicely.
| splunk.collector.url | Path fragement the servlet should respond on | string | /services/collector/event | | low |
| splunk.ssl.renegotiation.allowed | Flag to determine if ssl renegotiation is allowed. | boolean | true | | low |

### Example Config

```
name=splunk-http-source
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.splunk.SplunkHttpSourceConnector
splunk.ssl.key.store.path=/etc/security/keystore.jks
splunk.ssl.key.store.password=password
splunk.collector.index.default=main
```
# Schemas

## com.github.jcustenborder.kafka.connect.splunk.EventKey

This schema represents the key for the data received from the Splunk listener.

| Name | Optional | Schema | Default Value | Documentation |
|------|----------|-------------------------------------------------------------------------------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------|
| host | false | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | The host value to assign to the event data. This is typically the hostname of the client from which you're sending data. |

# Sink Connector
## com.github.jcustenborder.kafka.connect.splunk.Event

The Sink Connector will transform data from a Kafka topic into a batch of json messages that will be written via HTTP to
a configured [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M).
This schema represents the data received from the Splunk listener.

## Configuration
| Name | Optional | Schema | Default Value | Documentation |
|------------|----------|-------------------------------------------------------------------------------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| time | true | [Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Timestamp.html) | | The event time. |
| host | true | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | The host value to assign to the event data. This is typically the hostname of the client from which you're sending data. |
| source | true | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | The source value to assign to the event data. For example, if you're sending data from an app you're developing, you could set this key to the name of the app. |
| sourcetype | true | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | The sourcetype value to assign to the event data. |
| index | true | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | The name of the index by which the event data is to be indexed. The index you specify here must within the list of allowed indexes if the token has the indexes parameter set. |
| event | true | [String](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html#STRING) | | This is the event it's self. This is the serialized json form. It could be an object or a string. |

| Name | Description | Type | Default | Valid Values | Importance |
|---------------------------------|-----------------------------------------------------------------------------------------|----------|----------|--------------|------------|
| splunk.auth.token | The authorization token to use when writing data to splunk. | password | | | high |
| splunk.remote.host | The hostname of the remote splunk host to write data do. | string | | | high |
| splunk.ssl.enabled | Flag to determine if the connection to splunk should be over ssl. | boolean | true | | high |
| splunk.ssl.trust.store.password | Password for the trust store. | password | [hidden] | | high |
| splunk.ssl.trust.store.path | Path on the local disk to the certificate trust store. | string | "" | | high |
| splunk.remote.port | Port on the remote splunk server to write to. | int | 8088 | | medium |
| splunk.ssl.validate.certs | Flag to determine if ssl connections should validate the certificateof the remote host. | boolean | true | | medium |

### Example Config

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>0.10.1.0-cp1</version>
<version>0.10.2.0-cp1</version>
</parent>
<artifactId>kafka-connect-splunk</artifactId>
<version>0.2.0-SNAPSHOT</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -34,12 +34,14 @@
class EventConverter {
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.splunk.EventKey")
.doc("This schema represents the key for the data received from the Splunk listener.")
.field("host", SchemaBuilder.string().doc("The host value to assign to the event data. " +
"This is typically the hostname of the client from which you're sending data.").build())
.build();

public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.splunk.Event")
.doc("This schema represents the data received from the Splunk listener.")
.field("time", Timestamp.builder().optional().doc("The event time.").build())
.field("host", SchemaBuilder.string().optional().doc("The host value to assign to the event data. " +
"This is typically the hostname of the client from which you're sending data.").build())
Expand Down Expand Up @@ -68,11 +70,11 @@ class EventConverter {

EventConverter(SplunkHttpSourceConnectorConfig config) {
this.config = config;
this.topicPerIndex = this.config.topicPerIndex();
this.topicPrefix = this.config.topicPrefix();
this.topicPerIndex = this.config.topicPerIndex;
this.topicPrefix = this.config.topicPrefix;
this.indexToTopicLookup = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
this.topic = this.topicPerIndex ? null : this.topicPrefix;
this.defaultIndex = this.config.defaultIndex();
this.defaultIndex = this.config.defaultIndex;
}

static <T> void setFieldValue(JsonNode messageNode, Struct struct, String fieldName, Class<T> cls) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -48,7 +48,7 @@ public void configure(SplunkHttpSourceConnectorConfig config, JsonFactory jsonFa
this.jsonFactory = jsonFactory;
this.converter = new EventConverter(this.config);
this.recordQueue = recordQueue;
this.allowedIndexes = this.config.allowedIndexes();
this.allowedIndexes = this.config.allowedIndexes;
}

@Override
Expand Down Expand Up @@ -76,9 +76,7 @@ public String host(HttpServletRequest request) {

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (log.isInfoEnabled()) {
log.info("Reading message body.");
}
log.trace("Reading message body.");

response.setHeader("X-Content-Type-Options", "nosniff");
response.setHeader("X-Frame-Options", "SAMEORIGIN");
Expand Down Expand Up @@ -116,10 +114,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
response.setStatus(200);

} catch (Exception ex) {
if (log.isErrorEnabled()) {
log.error("Exception thrown", ex);
}

log.error("Exception thrown", ex);
response.setStatus(500);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -43,6 +43,13 @@
class ObjectMapperFactory {

public static final ObjectMapper INSTANCE;
static final Set<String> RESERVED_METADATA = ImmutableSet.of(
"time",
"host",
"source",
"sourcetype",
"index"
);

static {
ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -85,14 +92,6 @@ public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider
}
}

static final Set<String> RESERVED_METADATA = ImmutableSet.of(
"time",
"host",
"source",
"sourcetype",
"index"
);

static class StructSerializer extends JsonSerializer<Struct> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -15,6 +15,8 @@
*/
package com.github.jcustenborder.kafka.connect.splunk;

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
Expand All @@ -25,14 +27,16 @@
import java.util.List;
import java.util.Map;

@Description("The Sink Connector will transform data from a Kafka topic into a batch of json messages that will be written via HTTP to " +
"a configured [Splunk Http Event Collector](http://dev.splunk.com/view/event-collector/SP-CAAAE6M).")
public class SplunkHttpSinkConnector extends SinkConnector {
private static Logger log = LoggerFactory.getLogger(SplunkHttpSinkConnector.class);
Map<String, String> settings;
private SplunkHttpSinkConnectorConfig config;

@Override
public String version() {
return VersionUtil.getVersion();
return VersionUtil.version(this.getClass());
}

@Override
Expand Down
Loading

0 comments on commit 9c7a992

Please sign in to comment.