diff --git a/.travis.yml b/.travis.yml index 7dc67e0..24846f5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,9 @@ services: - docker env: - - ES_IMAGE="elasticsearch-oss:7.3.2" - - ES_IMAGE="elasticsearch-oss:6.2.2" - - ES_IMAGE="elasticsearch:5.6.8" ES_USER="elastic" ES_PASS="changeme" + - ES_IMAGE="elasticsearch-oss:7.3.2" ES_TYPE="" ES_TEMPLATE="new-index-template.json" + - ES_IMAGE="elasticsearch-oss:6.2.2" ES_TYPE="test_type" ES_TEMPLATE="new-index-template-ES-6.x.json" + - ES_IMAGE="elasticsearch:5.6.8" ES_TYPE="test_type" ES_TEMPLATE="new-index-template-ES-6.x.json" ES_USER="elastic" ES_PASS="changeme" script: - tests/run_tests diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 6dc2f3b..159b453 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -89,7 +89,10 @@ These files are in JSON format and contain the following keys: Default: no default provided -- **ES_Type** [required]: ElasticSearch document type (_type field) used to store +- **ES_Type** [optional]: Used only for versions of ElasticSearch prior to 7. + For versions of ES >= 7, do not set it. + + ElasticSearch document type (_type field) used to store pmacct-to-elasticsearch output. Similar to tables in relational DB. From the official reference guide @@ -237,12 +240,12 @@ argument: # cd /etc/p2es/triggers/ # ln -s default_trigger my_print - + /etc/p2es/triggers/default_trigger: - + #!/bin/sh PLUGIN_NAME=`basename $0` /usr/local/bin/pmacct-to-elasticsearch $PLUGIN_NAME & -Otherwise, remember to use the full path of pmacct-to-elasticsearch in order +Otherwise, remember to use the full path of pmacct-to-elasticsearch in order to avoid problems with a stripped version of the *PATH* environment variable. diff --git a/pierky/p2es/es.py b/pierky/p2es/es.py index 661be83..9cd70fd 100644 --- a/pierky/p2es/es.py +++ b/pierky/p2es/es.py @@ -37,11 +37,17 @@ def http(CONFIG, url, method="GET", data=None): def send_to_es(CONFIG, index_name, data): # HTTP bulk insert toward ES - url = '{}/{}/{}/_bulk'.format( - CONFIG['ES_URL'], - index_name, - CONFIG['ES_Type'] - ) + if CONFIG['ES_Type']: + url = '{}/{}/{}/_bulk'.format( + CONFIG['ES_URL'], + index_name, + CONFIG['ES_Type'] + ) + else: + url = '{}/{}/_bulk'.format( + CONFIG['ES_URL'], + index_name + ) try: http_res = http(CONFIG, url, method="POST", data=data) diff --git a/scripts/pmacct-to-elasticsearch b/scripts/pmacct-to-elasticsearch index 7a84801..7a75eb9 100755 --- a/scripts/pmacct-to-elasticsearch +++ b/scripts/pmacct-to-elasticsearch @@ -21,7 +21,7 @@ import sys from pierky.p2es.errors import P2ESError from pierky.p2es.readers import JSONReader, CSVReader from pierky.p2es.transformations import test_transformation -from pierky.p2es.version import __version__ +from pierky.p2es.version import __version__ from pierky.p2es.writers import PrintOnlyWriterThread, ESWriterThread APP_NAME = 'pmacct-to-elasticsearch' @@ -40,7 +40,7 @@ DEF_CONFIG = { 'ES_AuthType': 'none', 'ES_FlushSize': 5000, - + 'ReaderThreads': 2, 'InputFormat': 'json', @@ -55,13 +55,13 @@ CONFIG = DEF_CONFIG.copy() def expand_macros(s): if s is None: return None - + out = deepcopy(s) out = out.replace('$PluginName', CONFIG.get('PluginName') or 'default') out = out.replace('$IndexName', datetime.datetime.now().strftime( CONFIG.get('ES_IndexName') or 'default' )) - out = out.replace('$Type', CONFIG.get('ES_Type') or 'default') + out = out.replace('$Type', CONFIG.get('ES_Type') or '') return out # Checks config and logs any errors. @@ -72,10 +72,6 @@ def check_config(): log(logging.ERROR, 'ElasticSearch index name not provided') return False - if not CONFIG['ES_Type']: - log(logging.ERROR, 'ElasticSearch type not provided') - return False - if not CONFIG['ES_URL']: log(logging.ERROR, 'ElasticSearch URL not provided') return False @@ -407,7 +403,7 @@ def main(): # Command line arguments have higher priority for pair in [('log_file', 'LogFile'), ('es_url', 'ES_URL'), - ('es_indexname', 'ES_IndexName'), ('es_type', 'ES_Type'), + ('es_indexname', 'ES_IndexName'), ('es_type', 'ES_Type'), ('es_authtype', 'ES_AuthType'), ('es_username', 'ES_UserName'), ('es_password', 'ES_Password'), ('es_templatefilename', 'ES_IndexTemplateFileName'), @@ -431,13 +427,13 @@ def main(): if args.test_only: print('Configuration tested successfully') return True - + if not CONFIG['InputFile']: r, w, x = select.select([sys.stdin], [], [], 0) if not r: log(logging.ERROR, 'Error while reading input data from stdin') return False - + # Timestamp for ES indexing (UTC) ts = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') diff --git a/tests/new-index-template-ES-6.x.json b/tests/new-index-template-ES-6.x.json new file mode 120000 index 0000000..b4995c4 --- /dev/null +++ b/tests/new-index-template-ES-6.x.json @@ -0,0 +1 @@ +../distrib/new-index-template-ES-6.x.json \ No newline at end of file diff --git a/tests/run_tests b/tests/run_tests index 804a1e8..16c841f 100755 --- a/tests/run_tests +++ b/tests/run_tests @@ -31,8 +31,8 @@ fi --es-url http://localhost:9200 \ ${ARG_AUTH} \ --es-indexname test \ - --es-type test_type \ - --es-index-template-file-name new-index-template.json \ + --es-type "${ES_TYPE}" \ + --es-index-template-file-name ${ES_TEMPLATE} \ --input-file tests/data.csv \ --input-format csv \ --config-dir `pwd`/tests \