Skip to content

Commit

Permalink
0.3.0a5 fix index creation ES 5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
pierky committed May 3, 2017
1 parent ba3d5c9 commit c062f3b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 74 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

0.3.0a5
-------

- Fix issue with index creation on ElasticSearch 5.x.

Thanks to Kristoffer Olsson and Daniel Lindberg for reporting this and for their extensive support.

- Improved template for index creation.

0.3.0a4
-------

Expand Down
26 changes: 26 additions & 0 deletions distrib/new-index-template-ES-2.x.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"mappings": {
"_default_": {
"_all" : { "enabled" : false },

"_source" : { "enabled" : true },

"dynamic_templates": [
{
"string_template" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping": { "type": "string", "index": "not_analyzed" }
}
}
],

"properties": {
"@timestamp": { "type": "date" },
"bytes": { "type": "long" },
"packets": { "type": "long" },
"flows": { "type": "long" }
}
}
}
}
42 changes: 21 additions & 21 deletions distrib/new-index-template.json
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
{
"mappings": {
"_default_": {
"_all" : { "enabled" : false },
"mappings": {
"_default_": {
"_all" : { "enabled" : false },

"_source" : { "enabled" : true },
"_source" : { "enabled" : true },

"dynamic_templates": [
{
"string_template" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping": { "type": "string", "index": "not_analyzed" }
}
}
],
"dynamic_templates": [
{
"string_template" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping": { "type": "keyword", "index": true }
}
}
],

"properties": {
"@timestamp": { "type": "date" },
"bytes": { "type": "long" },
"packets": { "type": "long" },
"flows": { "type": "long" }
}
}
}
"properties": {
"@timestamp": { "type": "date" },
"bytes": { "type": "long" },
"packets": { "type": "long" },
"flows": { "type": "long" }
}
}
}
}
98 changes: 49 additions & 49 deletions pierky/p2es/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,36 @@
# See full license in LICENSE file.

import json
import urllib2
import requests
from requests.auth import HTTPDigestAuth, HTTPBasicAuth

from errors import P2ESError

def http(CONFIG, url, method="GET", data=None):
auth = None
if CONFIG['ES_AuthType'] != 'none':
if CONFIG['ES_AuthType'] == 'basic':
auth = HTTPBasicAuth(CONFIG['ES_UserName'], CONFIG['ES_Password'])
elif CONFIG['ES_AuthType'] == 'digest':
auth = HTTPDigestAuth(CONFIG['ES_UserName'], CONFIG['ES_Password'])
else:
raise P2ESError(
'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType'])
)

headers = {'Content-Type': 'application/x-ndjson'}

if method == "GET":
return requests.get(url, auth=auth, headers=headers)
elif method == "POST":
return requests.post(url, auth=auth, data=data, headers=headers)
elif method == "PUT":
return requests.put(url, auth=auth, data=data, headers=headers)
elif method == "HEAD":
return requests.head(url, auth=auth, headers=headers)
else:
raise Exception("Method unknown: {}".format(method))

# Sends data to ES.
# Raises exceptions: yes.
def send_to_es(CONFIG, index_name, data):
Expand All @@ -18,7 +44,7 @@ def send_to_es(CONFIG, index_name, data):
)

try:
http_res = urllib2.urlopen(url, data)
http_res = http(CONFIG, url, method="POST", data=data)
except Exception as e:
raise P2ESError(
'Error while executing HTTP bulk insert on {} - {}'.format(
Expand All @@ -27,27 +53,24 @@ def send_to_es(CONFIG, index_name, data):
)

# Interpreting HTTP bulk insert response

http_plaintext = http_res.read()

if(http_res.getcode() != 200):
if http_res.status_code != 200:
raise P2ESError(
'Bulk insert on {} failed - '
'HTTP status code = {} - '
'Response {}'.format(
index_name, http_res.getcode(), http_plaintext
index_name, http_res.status_code, http_res.text
)
)

try:
json_res = json.loads(http_plaintext)
json_res = http_res.json()
except Exception as e:
raise P2ESError(
'Error while decoding JSON HTTP response - '
'{} - '
'first 100 characters: {}'.format(
str(e),
http_plaintext[:100],
http_res.text[:100],
)
)

Expand All @@ -64,27 +87,19 @@ def does_index_exist(index_name, CONFIG):
url = '{}/{}'.format(CONFIG['ES_URL'], index_name)

try:
head_req = urllib2.Request(url)
head_req.get_method = lambda : 'HEAD'
http_res = urllib2.urlopen(head_req)
return http_res.getcode() == 200
except urllib2.HTTPError as err:
if err.code == 404:
status_code = http(CONFIG, url, method="HEAD").status_code
if status_code == 200:
return True
if status_code == 404:
return False
else:
raise P2ESError(
'Error while checking if {} index exists: {}'.format(
index_name, str(err)
)
)
raise Exception("Unexpected status code: {}".format(status_code))
except Exception as err:
raise P2ESError(
'Error while checking if {} index exists: {}'.format(
index_name, str(err)
)
)


# Creates index 'index_name' using template given in config.
# Raises exceptions: yes.
def create_index(index_name, CONFIG):
Expand All @@ -110,11 +125,18 @@ def create_index(index_name, CONFIG):

last_err = None
try:
http_res = urllib2.urlopen(url, tpl)
except Exception as e:
# something went wrong: does index exist anyway?
last_err = str(e)
pass
# using PUT
http_res = http(CONFIG, url, method="PUT", data=tpl)
except Exception as e1:
last_err = "Error using PUT method: {}".format(str(e1))
# trying the old way
try:
http_res = http(CONFIG, url, method="POST", data=tpl)
except Exception as e2:
# something went wrong: does index exist anyway?
last_err += " - "
last_err += "Error using old way: {}".format(str(e2))
pass

try:
if does_index_exist(index_name, CONFIG):
Expand All @@ -128,25 +150,3 @@ def create_index(index_name, CONFIG):
else:
err += "error unknown"
raise P2ESError(err.format(index_name, tpl_path))

def prepare_for_http_auth(CONFIG):
if CONFIG['ES_AuthType'] != 'none':
pwdman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwdman.add_password(
None,
CONFIG['ES_URL'],
CONFIG['ES_UserName'],
CONFIG['ES_Password']
)

if CONFIG['ES_AuthType'] == 'basic':
auth_handler = urllib2.HTTPBasicAuthHandler(pwdman)
elif CONFIG['ES_AuthType'] == 'digest':
auth_handler = urllib2.HTTPDigestAuthHandler(pwdman)
else:
raise P2ESError(
'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType'])
)

opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
2 changes: 1 addition & 1 deletion pierky/p2es/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This code is Copyright 2014-2017 by Pier Carlo Chiodi.
# See full license in LICENSE file.

__version__ = "0.3.0a4"
__version__ = "0.3.0a5"
3 changes: 0 additions & 3 deletions pierky/p2es/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ class ESWriterThread(BaseWriterThread):
def __init__(self, *args, **kwargs):
BaseWriterThread.__init__(self, *args, **kwargs)

# Preparing for HTTP authentication
prepare_for_http_auth(self.CONFIG)

# Creating index
self.index_name = datetime.datetime.now().strftime(
self.CONFIG['ES_IndexName']
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
maintainer_email="[email protected]",

install_requires=[
"requests"
],

scripts=["scripts/pmacct-to-elasticsearch"],
Expand Down

0 comments on commit c062f3b

Please sign in to comment.