Skip to content

Commit

Permalink
refactoring 1
Browse files Browse the repository at this point in the history
  • Loading branch information
pierky committed Jan 20, 2017
1 parent ea281f4 commit bc2795c
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 0 deletions.
160 changes: 160 additions & 0 deletions new
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python

import argparse
import logging
import sys

from transformations import *


APP_NAME = 'pmacct-to-elasticsearch'
CURRENT_RELEASE = 'v0.3.0'

CONF_DIR = '/etc/p2es'

EXITCODE_OK = 0
EXITCODE_OneOrMoreErrors = 1

EXITCODE_Program = 2
EXITCODE_ElasticSearch = 3

DEF_CONFIG = {
'LogFile': '/var/log/{}-$PluginName.log'.format(APP_NAME),

'ES_URL': 'http://localhost:9200',
'ES_IndexName': '',
'ES_Type': '',
'ES_AuthType': 'none',

'ES_FlushSize': 5000,

'InputFile': None,

'Transformations': []
}

CONFIG = DEF_CONFIG.copy()


def expand_macro(s):
if s is None:
return None

out = s
out = out.replace('$PluginName', CONFIG.get('PluginName') or 'default')
out = out.replace('$IndexName', datetime.datetime.now().strftime(
CONFIG.get('ES_IndexName') or 'default'
))
out = out.replace('$Type', CONFIG.get('ES_Type') or 'default')
return out

def expand_data_macros(s, dic):
if "$" in s:
res = s
for k in dic:
res = res.replace("${}".format(k), str(dic[k]))
return res
return s

def setup_logginig(baselogfile=None):
if baselogfile:
logfilepath = expand_macros(baselogfile)
else:
logfilepath = None

logger = logging.getLogger(APP_NAME)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
logger.setLevel(logging.INFO)

logger.handlers = []

if logfilepath:
# log to stdout too
if sys.stdout.isatty():
try:
hdlr = logging.StreamHandler(sys.stdout)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
except:
pass
try:
hdlr = logging.handlers.RotatingFileHandler(logfilepath,
maxBytes=1000000,
backupCount=3)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
except:
log(logging.ERROR,
"Can't setup logging to file {}. "
"Ensure it has write permissions for the current user.".format(
logfilepath
)
)
return False
else:
try:
hdlr = logging.StreamHandler(sys.stderr)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
except:
sys.stderr.write("Can't setup logging to stderr.")
return False

return True

def log(lev, msg, exc_info=False):
logger = logging.getLogger(APP_NAME)
logger.log(lev, msg, exc_info=exc_info)

def main():
parser = argparse.ArgumentParser(
description="pmacct-to-elasticsearch"
)
parser.add_argument(
"-p", "--print",
help="only print output to stdout "
"(does not send data to ElasticSearch)",
action="store_true",
dest="print_only")

parser.add_argument(
"-t", "--test",
help="only tests configuration "
"(does not send data to ElasticSearch)",
action="store_true",
dest="test_only")

args = parser.parse_args()

Config.load(args.cfg_file)

##Test conditions
##-------------------
#
##C = [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ]
##C = [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ]
#C = [ "OR", [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], { "Name": "Tom" }, [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] ]
##C = [ "Invalid" ]
#
#Data = [
# { "Name": "Bob", "Age": 15 },
# { "Name": "Bob", "Age": 16 },
# { "Name": "Ken", "Age": 14 },
# { "Name": "Tom", "Age": 14 },
# { "Name": "Tom", "Age": 20 },
# { "Name": "Lisa", "Age": 15 },
# { "Name": "Lisa", "Age": 22 }
#]
#
#print(C)
#for Person in Data:
# try:
# if parse_conditions(C, Person):
# print( "YES - %s" % Person )
# else:
# print( "--- - %s" % Person )
# except Exception as e:
# print( "ParseConditions error: %s" % str(e) )
# raise

main()
163 changes: 163 additions & 0 deletions transformations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# See TRANSFORMATIONS.md file for details

import json


def parse_conditions_list(c, d):
if not c:
raise Exception('Empty list')

if isinstance(c[0], basestring):
if c[0] == 'AND':
if len(c) > 2:
for sub_c in c[1:]:
if not parse_conditions(sub_c, d):
return False
return True
else:
return False

elif c[0] == 'OR':
if len(c) > 2:
for sub_c in c[1:]:
if parse_conditions(sub_c, d):
return True
return False
else:
return True

else:
raise Exception(
'Logical groups must begin with "AND" or "OR" '
'("{}" found)'.format(c[0])
)
else:
# default to "AND" if not specified

for sub_c in c:
if not parse_conditions(sub_c, d):
return False
return True

def parse_conditions_dict(c, d, opfield):
op = '='
n = None
v = None

for k in c:
if k == opfield:
op = c[k]

if not op in ('=', '>', '>=', '<', '<=', '!=', 'in', 'notin'):
raise Exception('Unexpected operator: "{}"'.format(op))
else:
if n is None:
n = k
v = c[k]
else:
raise Exception('Only one name/value pair allowed')

if op in ('in', 'notin') and not isinstance(v, list):
raise Exception('The "{}" operator requires a list'.format(op))

if n is None:
raise Exception('Name/value pair expected')

if n not in d:
return False

if op == '=':
return d[n] == v
elif op == '>':
return d[n] > v
elif op == '>=':
return d[n] >= v
elif op == '<':
return d[n] < v
elif op == '<=':
return d[n] <= v
elif op == '!=':
return d[n] != v
elif op == 'in':
return d[n] in v
elif op == 'notin':
return not d[n] in v
else:
raise Exception('Operator not implemented: "{}"'.format(op))

def parse_conditions(c, d, opfield='__op__'):
if isinstance(c, list):
return parse_conditions_list(c, d)
elif isinstance(c, dict):
return parse_conditions_dict(c, d, opfield)
else:
raise Exception('Unexpected object type {} from {}'.format(
type(c), str(c)
))

# returns True or False
def test_transformation(tr):
ret = True

try:
tr_det = 'Transformations matrix ({})'.format(transformation)
except:
tr_det = 'Transformations matrix'

if 'Conditions' not in tr:
raise Exception('{}, "Conditions" is missing'.format(tr_det))

if 'Actions' not in tr:
raise Exception('{}, "Actions" is missing'.format(tr_det))

try:
parse_conditions(tr['Conditions'], {})
except Exception as e:
raise Exception('{}, invalid "Conditions": {}'.format(tr_det, str(e)))

for action in tr['Actions']:
if 'Type' not in action:
raise Exception('{}, "Type" is missing'.format(tr_det))

tr_det += ', action type = {}'.format(action['Type'])

if action['Type'] not in ('AddField', 'AddFieldLookup', 'DelField'):
raise Exception('{}, "Type" unknown'.format(tr_det))

if 'Name' not in action:
raise Exception('{}, "Name" is missing'.format(tr_det))

if action['Type'] == 'AddField':
if 'Value' not in action:
raise Exception(
'{}, "Value" is missing for new field "{}"'.format(
tr_det, action['Name']
)
)

if action['Type'] == 'AddFieldLookup':
if 'LookupFieldName' not in action:
raise Exception(
'{}, "LookupFieldName" is missing for '
'new field "{}"'.format(tr_det, action['Name'])
)
if 'LookupTable' in action and 'LookupTableFile' in action:
raise Exception(
'{}, only one from "LookupTable" and '
'"LookupTableFile" allowed'.format(tr_det)
)
if 'LookupTable' not in action and 'LookupTableFile' not in action:
raise Exception(
'{}, "LookupTable" and "LookupTableFile" missing '
'for new field "{}"'.format(tr_det, action['Name'])
)
if 'LookupTableFile' in action:
try:
with open(action['LookupTableFile'], "r") as f:
action['LookupTable'] = json.load(f.read())
except:
raise Exception(
'{}, error loading lookup table from {}'.format(
tr_det, action['LookupTableFile']
)
)

0 comments on commit bc2795c

Please sign in to comment.