diff --git a/new b/new new file mode 100755 index 0000000..cfa47e3 --- /dev/null +++ b/new @@ -0,0 +1,160 @@ +#!/usr/bin/env python + +import argparse +import logging +import sys + +from transformations import * + + +APP_NAME = 'pmacct-to-elasticsearch' +CURRENT_RELEASE = 'v0.3.0' + +CONF_DIR = '/etc/p2es' + +EXITCODE_OK = 0 +EXITCODE_OneOrMoreErrors = 1 + +EXITCODE_Program = 2 +EXITCODE_ElasticSearch = 3 + +DEF_CONFIG = { + 'LogFile': '/var/log/{}-$PluginName.log'.format(APP_NAME), + + 'ES_URL': 'http://localhost:9200', + 'ES_IndexName': '', + 'ES_Type': '', + 'ES_AuthType': 'none', + + 'ES_FlushSize': 5000, + + 'InputFile': None, + + 'Transformations': [] +} + +CONFIG = DEF_CONFIG.copy() + + +def expand_macro(s): + if s is None: + return None + + out = s + out = out.replace('$PluginName', CONFIG.get('PluginName') or 'default') + out = out.replace('$IndexName', datetime.datetime.now().strftime( + CONFIG.get('ES_IndexName') or 'default' + )) + out = out.replace('$Type', CONFIG.get('ES_Type') or 'default') + return out + +def expand_data_macros(s, dic): + if "$" in s: + res = s + for k in dic: + res = res.replace("${}".format(k), str(dic[k])) + return res + return s + +def setup_logginig(baselogfile=None): + if baselogfile: + logfilepath = expand_macros(baselogfile) + else: + logfilepath = None + + logger = logging.getLogger(APP_NAME) + formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + logger.setLevel(logging.INFO) + + logger.handlers = [] + + if logfilepath: + # log to stdout too + if sys.stdout.isatty(): + try: + hdlr = logging.StreamHandler(sys.stdout) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + pass + try: + hdlr = logging.handlers.RotatingFileHandler(logfilepath, + maxBytes=1000000, + backupCount=3) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + log(logging.ERROR, + "Can't setup logging to file {}. " + "Ensure it has write permissions for the current user.".format( + logfilepath + ) + ) + return False + else: + try: + hdlr = logging.StreamHandler(sys.stderr) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + sys.stderr.write("Can't setup logging to stderr.") + return False + + return True + +def log(lev, msg, exc_info=False): + logger = logging.getLogger(APP_NAME) + logger.log(lev, msg, exc_info=exc_info) + +def main(): + parser = argparse.ArgumentParser( + description="pmacct-to-elasticsearch" + ) + parser.add_argument( + "-p", "--print", + help="only print output to stdout " + "(does not send data to ElasticSearch)", + action="store_true", + dest="print_only") + + parser.add_argument( + "-t", "--test", + help="only tests configuration " + "(does not send data to ElasticSearch)", + action="store_true", + dest="test_only") + + args = parser.parse_args() + + Config.load(args.cfg_file) + +##Test conditions +##------------------- +# +##C = [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ] +##C = [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ] +#C = [ "OR", [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], { "Name": "Tom" }, [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] ] +##C = [ "Invalid" ] +# +#Data = [ +# { "Name": "Bob", "Age": 15 }, +# { "Name": "Bob", "Age": 16 }, +# { "Name": "Ken", "Age": 14 }, +# { "Name": "Tom", "Age": 14 }, +# { "Name": "Tom", "Age": 20 }, +# { "Name": "Lisa", "Age": 15 }, +# { "Name": "Lisa", "Age": 22 } +#] +# +#print(C) +#for Person in Data: +# try: +# if parse_conditions(C, Person): +# print( "YES - %s" % Person ) +# else: +# print( "--- - %s" % Person ) +# except Exception as e: +# print( "ParseConditions error: %s" % str(e) ) +# raise + +main() diff --git a/transformations.py b/transformations.py new file mode 100644 index 0000000..36e699c --- /dev/null +++ b/transformations.py @@ -0,0 +1,163 @@ +# See TRANSFORMATIONS.md file for details + +import json + + +def parse_conditions_list(c, d): + if not c: + raise Exception('Empty list') + + if isinstance(c[0], basestring): + if c[0] == 'AND': + if len(c) > 2: + for sub_c in c[1:]: + if not parse_conditions(sub_c, d): + return False + return True + else: + return False + + elif c[0] == 'OR': + if len(c) > 2: + for sub_c in c[1:]: + if parse_conditions(sub_c, d): + return True + return False + else: + return True + + else: + raise Exception( + 'Logical groups must begin with "AND" or "OR" ' + '("{}" found)'.format(c[0]) + ) + else: + # default to "AND" if not specified + + for sub_c in c: + if not parse_conditions(sub_c, d): + return False + return True + +def parse_conditions_dict(c, d, opfield): + op = '=' + n = None + v = None + + for k in c: + if k == opfield: + op = c[k] + + if not op in ('=', '>', '>=', '<', '<=', '!=', 'in', 'notin'): + raise Exception('Unexpected operator: "{}"'.format(op)) + else: + if n is None: + n = k + v = c[k] + else: + raise Exception('Only one name/value pair allowed') + + if op in ('in', 'notin') and not isinstance(v, list): + raise Exception('The "{}" operator requires a list'.format(op)) + + if n is None: + raise Exception('Name/value pair expected') + + if n not in d: + return False + + if op == '=': + return d[n] == v + elif op == '>': + return d[n] > v + elif op == '>=': + return d[n] >= v + elif op == '<': + return d[n] < v + elif op == '<=': + return d[n] <= v + elif op == '!=': + return d[n] != v + elif op == 'in': + return d[n] in v + elif op == 'notin': + return not d[n] in v + else: + raise Exception('Operator not implemented: "{}"'.format(op)) + +def parse_conditions(c, d, opfield='__op__'): + if isinstance(c, list): + return parse_conditions_list(c, d) + elif isinstance(c, dict): + return parse_conditions_dict(c, d, opfield) + else: + raise Exception('Unexpected object type {} from {}'.format( + type(c), str(c) + )) + +# returns True or False +def test_transformation(tr): + ret = True + + try: + tr_det = 'Transformations matrix ({})'.format(transformation) + except: + tr_det = 'Transformations matrix' + + if 'Conditions' not in tr: + raise Exception('{}, "Conditions" is missing'.format(tr_det)) + + if 'Actions' not in tr: + raise Exception('{}, "Actions" is missing'.format(tr_det)) + + try: + parse_conditions(tr['Conditions'], {}) + except Exception as e: + raise Exception('{}, invalid "Conditions": {}'.format(tr_det, str(e))) + + for action in tr['Actions']: + if 'Type' not in action: + raise Exception('{}, "Type" is missing'.format(tr_det)) + + tr_det += ', action type = {}'.format(action['Type']) + + if action['Type'] not in ('AddField', 'AddFieldLookup', 'DelField'): + raise Exception('{}, "Type" unknown'.format(tr_det)) + + if 'Name' not in action: + raise Exception('{}, "Name" is missing'.format(tr_det)) + + if action['Type'] == 'AddField': + if 'Value' not in action: + raise Exception( + '{}, "Value" is missing for new field "{}"'.format( + tr_det, action['Name'] + ) + ) + + if action['Type'] == 'AddFieldLookup': + if 'LookupFieldName' not in action: + raise Exception( + '{}, "LookupFieldName" is missing for ' + 'new field "{}"'.format(tr_det, action['Name']) + ) + if 'LookupTable' in action and 'LookupTableFile' in action: + raise Exception( + '{}, only one from "LookupTable" and ' + '"LookupTableFile" allowed'.format(tr_det) + ) + if 'LookupTable' not in action and 'LookupTableFile' not in action: + raise Exception( + '{}, "LookupTable" and "LookupTableFile" missing ' + 'for new field "{}"'.format(tr_det, action['Name']) + ) + if 'LookupTableFile' in action: + try: + with open(action['LookupTableFile'], "r") as f: + action['LookupTable'] = json.load(f.read()) + except: + raise Exception( + '{}, error loading lookup table from {}'.format( + tr_det, action['LookupTableFile'] + ) + )