diff --git a/CHANGES.rst b/CHANGES.rst index 90aa4cd..75c1dd9 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,15 @@ Changelog ========= +0.3.0a5 +------- + +- Fix issue with index creation on ElasticSearch 5.x. + + Thanks to Kristoffer Olsson and Daniel Lindberg for reporting this and for their extensive support. + +- Improved template for index creation. + 0.3.0a4 ------- diff --git a/distrib/new-index-template-ES-2.x.json b/distrib/new-index-template-ES-2.x.json new file mode 100644 index 0000000..2e7a571 --- /dev/null +++ b/distrib/new-index-template-ES-2.x.json @@ -0,0 +1,26 @@ +{ + "mappings": { + "_default_": { + "_all" : { "enabled" : false }, + + "_source" : { "enabled" : true }, + + "dynamic_templates": [ + { + "string_template" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping": { "type": "string", "index": "not_analyzed" } + } + } + ], + + "properties": { + "@timestamp": { "type": "date" }, + "bytes": { "type": "long" }, + "packets": { "type": "long" }, + "flows": { "type": "long" } + } + } + } +} diff --git a/distrib/new-index-template.json b/distrib/new-index-template.json index 28eef28..052a8f5 100644 --- a/distrib/new-index-template.json +++ b/distrib/new-index-template.json @@ -1,26 +1,26 @@ { - "mappings": { - "_default_": { - "_all" : { "enabled" : false }, + "mappings": { + "_default_": { + "_all" : { "enabled" : false }, - "_source" : { "enabled" : true }, + "_source" : { "enabled" : true }, - "dynamic_templates": [ - { - "string_template" : { - "match" : "*", - "match_mapping_type" : "string", - "mapping": { "type": "string", "index": "not_analyzed" } - } - } - ], + "dynamic_templates": [ + { + "string_template" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping": { "type": "keyword", "index": true } + } + } + ], - "properties": { - "@timestamp": { "type": "date" }, - "bytes": { "type": "long" }, - "packets": { "type": "long" }, - "flows": { "type": "long" } - } - } - } + "properties": { + "@timestamp": { "type": "date" }, + "bytes": { "type": "long" }, + "packets": { "type": "long" }, + "flows": { "type": "long" } + } + } + } } diff --git a/pierky/p2es/es.py b/pierky/p2es/es.py index dd0494e..dc9e3f8 100644 --- a/pierky/p2es/es.py +++ b/pierky/p2es/es.py @@ -2,10 +2,36 @@ # See full license in LICENSE file. import json -import urllib2 +import requests +from requests.auth import HTTPDigestAuth, HTTPBasicAuth from errors import P2ESError +def http(CONFIG, url, method="GET", data=None): + auth = None + if CONFIG['ES_AuthType'] != 'none': + if CONFIG['ES_AuthType'] == 'basic': + auth = HTTPBasicAuth(CONFIG['ES_UserName'], CONFIG['ES_Password']) + elif CONFIG['ES_AuthType'] == 'digest': + auth = HTTPDigestAuth(CONFIG['ES_UserName'], CONFIG['ES_Password']) + else: + raise P2ESError( + 'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType']) + ) + + headers = {'Content-Type': 'application/x-ndjson'} + + if method == "GET": + return requests.get(url, auth=auth, headers=headers) + elif method == "POST": + return requests.post(url, auth=auth, data=data, headers=headers) + elif method == "PUT": + return requests.put(url, auth=auth, data=data, headers=headers) + elif method == "HEAD": + return requests.head(url, auth=auth, headers=headers) + else: + raise Exception("Method unknown: {}".format(method)) + # Sends data to ES. # Raises exceptions: yes. def send_to_es(CONFIG, index_name, data): @@ -18,7 +44,7 @@ def send_to_es(CONFIG, index_name, data): ) try: - http_res = urllib2.urlopen(url, data) + http_res = http(CONFIG, url, method="POST", data=data) except Exception as e: raise P2ESError( 'Error while executing HTTP bulk insert on {} - {}'.format( @@ -27,27 +53,24 @@ def send_to_es(CONFIG, index_name, data): ) # Interpreting HTTP bulk insert response - - http_plaintext = http_res.read() - - if(http_res.getcode() != 200): + if http_res.status_code != 200: raise P2ESError( 'Bulk insert on {} failed - ' 'HTTP status code = {} - ' 'Response {}'.format( - index_name, http_res.getcode(), http_plaintext + index_name, http_res.status_code, http_res.text ) ) try: - json_res = json.loads(http_plaintext) + json_res = http_res.json() except Exception as e: raise P2ESError( 'Error while decoding JSON HTTP response - ' '{} - ' 'first 100 characters: {}'.format( str(e), - http_plaintext[:100], + http_res.text[:100], ) ) @@ -64,19 +87,12 @@ def does_index_exist(index_name, CONFIG): url = '{}/{}'.format(CONFIG['ES_URL'], index_name) try: - head_req = urllib2.Request(url) - head_req.get_method = lambda : 'HEAD' - http_res = urllib2.urlopen(head_req) - return http_res.getcode() == 200 - except urllib2.HTTPError as err: - if err.code == 404: + status_code = http(CONFIG, url, method="HEAD").status_code + if status_code == 200: + return True + if status_code == 404: return False - else: - raise P2ESError( - 'Error while checking if {} index exists: {}'.format( - index_name, str(err) - ) - ) + raise Exception("Unexpected status code: {}".format(status_code)) except Exception as err: raise P2ESError( 'Error while checking if {} index exists: {}'.format( @@ -84,7 +100,6 @@ def does_index_exist(index_name, CONFIG): ) ) - # Creates index 'index_name' using template given in config. # Raises exceptions: yes. def create_index(index_name, CONFIG): @@ -110,11 +125,18 @@ def create_index(index_name, CONFIG): last_err = None try: - http_res = urllib2.urlopen(url, tpl) - except Exception as e: - # something went wrong: does index exist anyway? - last_err = str(e) - pass + # using PUT + http_res = http(CONFIG, url, method="PUT", data=tpl) + except Exception as e1: + last_err = "Error using PUT method: {}".format(str(e1)) + # trying the old way + try: + http_res = http(CONFIG, url, method="POST", data=tpl) + except Exception as e2: + # something went wrong: does index exist anyway? + last_err += " - " + last_err += "Error using old way: {}".format(str(e2)) + pass try: if does_index_exist(index_name, CONFIG): @@ -128,25 +150,3 @@ def create_index(index_name, CONFIG): else: err += "error unknown" raise P2ESError(err.format(index_name, tpl_path)) - -def prepare_for_http_auth(CONFIG): - if CONFIG['ES_AuthType'] != 'none': - pwdman = urllib2.HTTPPasswordMgrWithDefaultRealm() - pwdman.add_password( - None, - CONFIG['ES_URL'], - CONFIG['ES_UserName'], - CONFIG['ES_Password'] - ) - - if CONFIG['ES_AuthType'] == 'basic': - auth_handler = urllib2.HTTPBasicAuthHandler(pwdman) - elif CONFIG['ES_AuthType'] == 'digest': - auth_handler = urllib2.HTTPDigestAuthHandler(pwdman) - else: - raise P2ESError( - 'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType']) - ) - - opener = urllib2.build_opener(auth_handler) - urllib2.install_opener(opener) diff --git a/pierky/p2es/version.py b/pierky/p2es/version.py index b8dc8e0..def2e8a 100644 --- a/pierky/p2es/version.py +++ b/pierky/p2es/version.py @@ -1,4 +1,4 @@ # This code is Copyright 2014-2017 by Pier Carlo Chiodi. # See full license in LICENSE file. -__version__ = "0.3.0a4" +__version__ = "0.3.0a5" diff --git a/pierky/p2es/writers.py b/pierky/p2es/writers.py index 7ef3c22..1e6c9f2 100644 --- a/pierky/p2es/writers.py +++ b/pierky/p2es/writers.py @@ -66,9 +66,6 @@ class ESWriterThread(BaseWriterThread): def __init__(self, *args, **kwargs): BaseWriterThread.__init__(self, *args, **kwargs) - # Preparing for HTTP authentication - prepare_for_http_auth(self.CONFIG) - # Creating index self.index_name = datetime.datetime.now().strftime( self.CONFIG['ES_IndexName'] diff --git a/setup.py b/setup.py index a52d5e8..c2d9bbb 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ maintainer_email="pierky@pierky.com", install_requires=[ + "requests" ], scripts=["scripts/pmacct-to-elasticsearch"],