From 3a5dc06d288d97bc511e70a5b5081171909a7ec5 Mon Sep 17 00:00:00 2001 From: Pier Carlo Chiodi Date: Sat, 21 Jan 2017 16:07:37 +0100 Subject: [PATCH] refactoring 2 --- .gitignore | 92 ++++++++++++++++++ es.py | 136 ++++++++++++++++++++++++++ new | 233 +++++++++++++++++++++++++++++++++++++-------- transformations.py | 49 +++++++++- 4 files changed, 471 insertions(+), 39 deletions(-) create mode 100644 .gitignore create mode 100644 es.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..965d3f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,92 @@ +*.swp +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +.venv/ +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject diff --git a/es.py b/es.py new file mode 100644 index 0000000..5565373 --- /dev/null +++ b/es.py @@ -0,0 +1,136 @@ +import urllib2 + +# Sends data to ES. +# Raises exceptions: yes. +def send_to_es(CONFIG, index_name, data): + # HTTP bulk insert toward ES + + url = '{}/{}/{}/_bulk'.format( + CONFIG['ES_URL'], + index_name, + CONFIG['ES_Type'] + ) + + try: + http_res = urllib2.urlopen(url, data) + except Exception as e: + raise Exception( + 'Error while executing HTTP bulk insert on {} - {}'.format( + index_name, str(e) + ) + ) + + # Interpreting HTTP bulk insert response + + http_plaintext = http_res.read() + + if(http_res.getcode() != 200): + raise Exception( + 'Bulk insert on {} failed - ' + 'HTTP status code = {} - ' + 'Response {}'.format( + index_name, http_res.getcode(), http_plaintext + ) + ) + + try: + json_res = json.loads(http_plaintext) + except Exception as e: + raise Exception( + 'Error while decoding JSON HTTP response - ' + '{} - ' + 'first 100 characters: {}'.format( + str(e), + http_plaintext[:100], + ) + ) + + if json_res['errors']: + raise Exception( + 'Bulk insert on {} failed to process ' + 'one or more documents'.format(index_name) + ) + +# Checks if index_name exists. +# Returns: True | False. +# Raises exceptions: yes. +def does_index_exist(index_name, CONFIG): + url = '{}/{}'.format(CONFIG['ES_URL'], index_name) + + try: + head_req = urllib2.Request(url) + head_req.get_method = lambda : 'HEAD' + http_res = urllib2.urlopen(head_req) + return http_res.getcode() == 200 + except urllib2.HTTPError as err: + if err.code == 404: + return False + else: + raise Exception( + 'Error while checking if {} index exists: {}'.format( + index_name, str(err) + ) + ) + +# Creates index 'index_name' using template given in config. +# Raises exceptions: yes. +def create_index(index_name, CONF_DIR, CONFIG): + + # index already exists? + if does_index_exist(index_name, CONFIG): + return + + # index does not exist, creating it + tpl_path = '{}/{}'.format(CONF_DIR, CONFIG['ES_IndexTemplateFileName']) + + try: + with open(tpl_path, "r") as f: + tpl = f.read() + except Exception as e: + raise Exception( + 'Error while reading index template from file {}: {}'.format( + tpl_path, str(e) + ) + ) + + url = '{}/{}'.format(CONFIG['ES_URL'], index_name) + + last_err = None + try: + http_res = urllib2.urlopen(url, tpl) + except Exception as e: + # something went wrong: does index exist anyway? + last_err = str(e) + pass + + if does_index_exist(index_name, CONFIG): + return + + err = "An error occurred while creating index {} from template {}: " + if last_err: + err += last_err + else: + err += "error unknown" + raise Exception(err.format(index_name, tpl_path)) + +def prepare_for_http_auth(CONFIG): + if CONFIG['ES_AuthType'] != 'none': + pwdman = urllib2.HTTPPasswordMgrWithDefaultRealm() + pwdman.add_password( + None, + CONFIG['ES_URL'], + CONFIG['ES_UserName'], + CONFIG['ES_Password'] + ) + + if CONFIG['ES_AuthType'] == 'basic': + auth_handler = urllib2.HTTPBasicAuthHandler(pwdman) + elif CONFIG['ES_AuthType'] == 'digest': + auth_handler = urllib2.HTTPDigestAuthHandler(pwdman) + else: + raise Exception( + 'Unexpected authentication type: {}'.format(CONFIG['ES_AuthType']) + ) + + opener = urllib2.build_opener(auth_handler) + urllib2.install_opener(opener) diff --git a/new b/new index cfa47e3..2dbddaa 100755 --- a/new +++ b/new @@ -1,9 +1,17 @@ #!/usr/bin/env python import argparse +from copy import deepcopy +import datetime +import json import logging +from logging.handlers import RotatingFileHandler +import os.path +import select import sys + +from es import * from transformations import * @@ -36,11 +44,11 @@ DEF_CONFIG = { CONFIG = DEF_CONFIG.copy() -def expand_macro(s): +def expand_macros(s): if s is None: return None - out = s + out = deepcopy(s) out = out.replace('$PluginName', CONFIG.get('PluginName') or 'default') out = out.replace('$IndexName', datetime.datetime.now().strftime( CONFIG.get('ES_IndexName') or 'default' @@ -56,7 +64,95 @@ def expand_data_macros(s, dic): return res return s -def setup_logginig(baselogfile=None): +# Checks config and logs any errors. +# Returns: True | False. +# Raises exceptions: no +def check_config(): + if not CONFIG['ES_IndexName']: + log(logging.ERROR, 'ElasticSearch index name not provided') + return False + + if not CONFIG['ES_Type']: + log(logging.ERROR, 'ElasticSearch type not provided') + return False + + if not CONFIG['ES_URL']: + log(logging.ERROR, 'ElasticSearch URL not provided') + return False + + if not 'ES_IndexTemplateFileName' in CONFIG: + CONFIG['ES_IndexTemplateFileName'] = 'new-index-template.json' + else: + index_tpl_path = '{}/{}'.format( + CONF_DIR, CONFIG['ES_IndexTemplateFileName'] + ) + + if not os.path.isfile(index_tpl_path): + log(logging.ERROR, + 'Can\'t find index template file {}'.format(index_tpl_path)) + return False + else: + with open(index_tpl_path, "r") as f: + try: + index_tpl = json.load(f) + except Exception as e: + log(logging.ERROR, + 'Index template from {} is not ' + 'in valid JSON format: {}'.format( + index_tpl_path, str(e) + ), + exc_info=True) + return False + + if CONFIG['ES_URL'].endswith('/'): + CONFIG['ES_URL'] = CONFIG['ES_URL'][:-1] + + if CONFIG['ES_AuthType']: + if not CONFIG['ES_AuthType'] in ('none', 'basic', 'digest'): + log(logging.ERROR, + 'Authentication type must be "none" (default), ' + '"basic" or "digest"') + return False + + if CONFIG['ES_AuthType'] in ('basic', 'digest'): + if not CONFIG['ES_UserName']: + log(logging.ERROR, + 'Authentication required but username not provided') + return False + + if not CONFIG['ES_Password']: + log(logging.ERROR, 'Authentication required but password not provided') + return False + + if not 'ES_FlushSize' in CONFIG: + log(logging.ERROR, 'Flush size not provided') + return False + else: + try: + CONFIG['ES_FlushSize'] = int(CONFIG['ES_FlushSize']) + except: + log(logging.ERROR, 'Flush size must be a positive integer') + return False + + if CONFIG['ES_FlushSize'] < 0: + log(logging.ERROR, 'Flush size must be a positive integer') + return False + + if 'Transformations' in CONFIG: + for tr in CONFIG['Transformations']: + try: + test_transformation(tr) + except Exception as e: + log(logging.ERROR, + 'Invalid transformation: {}'.format(str(e))) + return False + + return True + +# Setup logging to stdout (is possible), file or stderr. +# Returns: True | False. +# Raises exceptions: no. +def setup_logging(baselogfile=None): if baselogfile: logfilepath = expand_macros(baselogfile) else: @@ -110,51 +206,112 @@ def main(): parser = argparse.ArgumentParser( description="pmacct-to-elasticsearch" ) + parser.add_argument( + "pluginname", + help="Plugin name.") parser.add_argument( "-p", "--print", - help="only print output to stdout " - "(does not send data to ElasticSearch)", + help="Only print output to stdout " + "(does not send data to ElasticSearch).", action="store_true", dest="print_only") - parser.add_argument( "-t", "--test", - help="only tests configuration " - "(does not send data to ElasticSearch)", + help="Only tests configuration " + "(does not send data to ElasticSearch).", action="store_true", dest="test_only") + parser.add_argument( + "--test-condition", + help="Test conditions given in FILE against --test-condition-data.", + metavar="FILE", + dest="test_condition") + parser.add_argument( + "--test-condition-data", + help="Data used to test condition given in --test-condition.", + metavar="FILE", + dest="test_condition_data") args = parser.parse_args() - Config.load(args.cfg_file) - -##Test conditions -##------------------- -# -##C = [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ] -##C = [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ] -#C = [ "OR", [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], { "Name": "Tom" }, [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] ] -##C = [ "Invalid" ] -# -#Data = [ -# { "Name": "Bob", "Age": 15 }, -# { "Name": "Bob", "Age": 16 }, -# { "Name": "Ken", "Age": 14 }, -# { "Name": "Tom", "Age": 14 }, -# { "Name": "Tom", "Age": 20 }, -# { "Name": "Lisa", "Age": 15 }, -# { "Name": "Lisa", "Age": 22 } -#] -# -#print(C) -#for Person in Data: -# try: -# if parse_conditions(C, Person): -# print( "YES - %s" % Person ) -# else: -# print( "--- - %s" % Person ) -# except Exception as e: -# print( "ParseConditions error: %s" % str(e) ) -# raise + if not setup_logging(): + return EXITCODE_Program + + if args.test_condition and args.test_condition_data: + with open(args.test_condition, "r") as f: + c = json.load(f) + with open(args.test_condition_data, "r") as f: + d = json.load(f) + print( + "Tested condition evaluated to {}".format( + parse_conditions(c, d) + ) + ) + return EXITCODE_OK + else: + if args.test_condition and not args.test_condition_data or \ + args.test_condition_data and not args.test_condition: + log(logging.ERROR, "--test-condition and --test-condition-data " + "must be used togheter.") + return EXITCODE_Program + + CONFIG["PluginName"] = args.pluginname + + # Loading configuration + new_cfg_file_name = "{}.conf".format(CONFIG["PluginName"]) + try: + with open('{}/{}'.format(CONF_DIR, new_cfg_file_name), "r") as f: + new_cfg = json.load(f) + except: + log(logging.ERROR, + 'Error loading configuration from {}/{}'.format( + CONF_DIR, new_cfg_file_name), + exc_info=True) + return EXITCODE_Program + + CONFIG.update(new_cfg) + + if 'LogFile' in CONFIG: + if not setup_logging(CONFIG['LogFile']): + return EXITCODE_Program + else: + log(logging.ERROR, 'Missing LogFile') + return EXITCODE_Program + + if not check_config(): + return EXITCODE_Program + + if args.test_only: + print('Configuration tested successfully') + return EXITCODE_OK + + if not CONFIG['InputFile']: + r, w, x = select.select([sys.stdin], [], [], 0) + if not r: + log(logging.ERROR, 'Error while reading input data from stdin') + return EXITCODE_Program + + # Preparing for HTTP authentication + try: + prepare_for_http_auth(CONFIG) + except Exception as e: + log(logging.ERROR, str(e)) + return EXITCODE_Program + + # Creating index + index_name = datetime.datetime.now().strftime(CONFIG['ES_IndexName']) + try: + create_index(index_name, CONF_DIR, CONFIG) + except Exception as e: + log(logging.ERROR, "Error while creating index {}: {}".format( + index_name, str(e)) + ) + return EXITCODE_Program + + # Timestamp for ES indexing (UTC) + + ts = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + # Read pmacct's JSON output and perform transformations + print("Done!") main() diff --git a/transformations.py b/transformations.py index 36e699c..4b660cd 100644 --- a/transformations.py +++ b/transformations.py @@ -3,6 +3,9 @@ import json +# Parse list of conditions c against data d. +# Returns: True | False (conditions matched / did not match). +# Raises exceptions: yes. def parse_conditions_list(c, d): if not c: raise Exception('Empty list') @@ -39,6 +42,9 @@ def parse_conditions_list(c, d): return False return True +# Parse condition c against data d, using operator opfield. +# Returns: True | False (condition matched / did not match). +# Raises exceptions: yes. def parse_conditions_dict(c, d, opfield): op = '=' n = None @@ -85,6 +91,9 @@ def parse_conditions_dict(c, d, opfield): else: raise Exception('Operator not implemented: "{}"'.format(op)) +# Parse conditions c against data d. +# Return: True | False (conditions matched / did not match). +# Raises exception: yes. def parse_conditions(c, d, opfield='__op__'): if isinstance(c, list): return parse_conditions_list(c, d) @@ -95,7 +104,9 @@ def parse_conditions(c, d, opfield='__op__'): type(c), str(c) )) -# returns True or False +# Tests if a transformation syntax is valid. +# Returns: True | False. +# Raises exceptions: yes. def test_transformation(tr): ret = True @@ -161,3 +172,39 @@ def test_transformation(tr): tr_det, action['LookupTableFile'] ) ) + +if __name__ == '__main__': + #Test conditions + #------------------- + + #C = [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ] + #C = [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ] + C = [ "OR", + [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], + { "Name": "Tom" }, + [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] + ] + #C = [ "Invalid" ] + + Data = [ + { "Name": "Bob", "Age": 15 }, + { "Name": "Bob", "Age": 16 }, + { "Name": "Ken", "Age": 14 }, + { "Name": "Tom", "Age": 14 }, + { "Name": "Tom", "Age": 20 }, + { "Name": "Lisa", "Age": 15 }, + { "Name": "Lisa", "Age": 22 } + ] + + print(C) + for Person in Data: + try: + if parse_conditions(C, Person): + print( "YES - %s" % Person ) + else: + print( "--- - %s" % Person ) + except Exception as e: + print( "ParseConditions error: %s" % str(e) ) + raise + +