diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 0000000..3bf558d --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,212 @@ +# Configuration of pmacct-to-elasticsearch + +## How it works + +pmacct-to-elasticsearch reads pmacct JSON output and sends it to ElasticSearch. + +It works properly with two kinds of pmacct plugins: "memory" and "print". +The former, "memory", needs data to be passed to pmacct-to-elasticsearch's +stdin, while the latter, "print", needs a file to be written by pmacct +daemons, where pmacct-to-elasticsearch is instructed to read data from. + +For "print" plugins, a crontab job is needed to run pmacct client and to +redirect its output to pmacct-to-elasticsearch; for "memory" plugins the pmacct +daemon can directly execute pmacct-to-elasticsearch. More details will follow +within the rest of this document. + +![Configuration files](https://raw.github.com/pierky/pmacct-to-elasticsearch/master/img/config_files.png) + +Print plugins are preferable because, in case of pmacct daemon graceful +restart or shutdown, data are written to the output file and the trigger +is regularly executed. + +## 1-to-1 mapping with pmacct plugins + +For each pmacct's plugin you want to be processed by pmacct-to-elasticsearch +a configuration file must be present in the *CONF_DIR* directory to tell the +program how to process its output. + +Configuration file's name must be in the format *PluginName*.conf, where +*PluginName* is the name of the pmacct plugin to which the file refer to. + +Example: + + /etc/pmacct/nfacctd.conf: + + ! nfacctd configuration example + plugins: memory[my_mem], print[my_print] + + /etc/p2es/my_mem.conf + /etc/p2es/my_print.conf + +Basically these files tell pmacct-to-elasticsearch: + +1. where to read pmacct's output from; + +2. how to send output to ElasticSearch; + +3. (optionally) which transformations must be operated. + +To run pmacct-to-elasticsearch the first argument must be the *PluginName*, +in order to allow it to figure out what to do: + + pmacct-to-elasticsearch my_print + +## Configuration file syntax + +These files are in JSON format and contain the following keys: + +- **LogFile** [required]: path to the log file used by pmacct-to-elasticsearch + to write any error encountered while processing the output. + + It can contain some macros, which are replaced during execution: + *$PluginName*, *$IndexName*, *$Type* + + Log file will be automatically rotated every 1MB, for 3 times. + + **Default**: "/var/log/pmacct-to-elasticsearch-$PluginName.log" + +- **ES_URL** [required]: URL of ElasticSearch HTTP API. + + **Default**: "http://localhost:9200" + +- **ES_IndexName** [required]: name of the ElasticSearch index used to store + pmacct-to-elasticsearch output. + + It may contain Python strftime codes (http://strftime.org/) in order + to have periodic indices. + + Example: + "netflow-%Y-%m-%d" to have daily indices (netflow-YYYY-MM-DD) + + Default: no default provided + +- **ES_Type** [required]: ElasticSearch document type (_type field) used to store + pmacct-to-elasticsearch output. Similar to tables in relational DB. + + From the official reference guide + http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/_basic_concepts.html#_type: + + > Within an index, you can define one or more types. A type is a logical + > category/partition of your index whose semantics is completely up to + > you. In general, a type is defined for documents that have a set of + > common fields. For example, let.s assume you run a blogging platform + > and store all your data in a single index. In this index, you may + > define a type for user data, another type for blog data, and yet + > another type for comments data." + + Default: no default provided + +- **ES_IndexTemplateFileName** [required]: name of the file containing the + template to be used when creating a new index. The file must be in the + *CONF_DIR* directory. + + **Default**: new-index-template.json (included in pmacct-to-elasticsearch) + + The default template provided with pmacct-to-elasticsearch has the + _source field enabled; if you want to save some storage disable it + by editing the new-index-template.json file: + + "_source" : { "enabled" : false } + +- **ES_FlushSize** [required]: how often to flush data to ElasticSearch BULK API. + + Set it to 0 to only send data once the whole input has been processed. + + **Default**: 5000 lines + +- **InputFile** [optional]: used mainly when configuring pmacct print plugins. + File used by pmacct-to-elasticsearch to read input data from (it + should coincide with pmacct's print plugin output file). + If omitted pmacct-to-elasticsearch will read data from stdin. + +- **Transformations** [optional]: the transformation matrix used to add new + fields to the output document sent to ElasticSearch for indexing. + + More details in the [TRANSFORMATIONS.md](TRANSFORMATIONS.md) file. + +This is an example of a basic configuration file: + + { + "ES_IndexName": "netflow-%Y-%m-%d", + "ES_Type": "ingress_traffic", + "InputFile": "/var/lib/pmacct/ingress_traffic.json", + } + +## Plugins configuration + +### Memory plugins + +For "memory" plugins, a crontab job is needed in order to periodically read +(and clear) the in-memory-table that pmacct uses to store data: + +Example of a command scheduled in crontab: + + pmacct -l -p /var/spool/pmacct/my_mem.pipe -s -O json -e | pmacct-to-elasticsearch my_mem + +In the example above, the pmacct client reads the in-memory-table +referenced by the **/var/spool/pmacct/my_mem.pipe** file and write the JSON +output to stdout, which in turn is redirected to the stdin of +pmacct-to-elasticsearch, that is executed with the **my_mem** argument in order +to let it to load the right configuration from **/etc/p2es/my_mem.conf**. + +### Print plugins + +For "print" plugins, the crontab job is not required but a feature of pmacct +may be used instead: the **print_trigger_exec** config key. +The print_trigger_exec key allows pmacct to directly run +pmacct-to-elasticsearch once the output has been fully written to the output +file. Since pmacct does not allow to pass arguments to programs executed using +the print_trigger_exec key, a trick is needed in order to let +pmacct-to-elasticsearch to understand what configuration to use: a trigger +file must be created for each "print" plugin and it has to execute the +program with the proper argument. + +Example: + + /etc/pmacct/nfacctd.conf: + + ! nfacctd configuration example + plugins: print[my_print] + print_output_file[my_print]: /var/lib/pmacct/my_print.json + print_output[my_print]: json + print_trigger_exec[my_print]: /etc/p2es/triggers/my_print + + /etc/p2es/triggers/my_print: + + #!/bin/sh + /usr/local/bin/pmacct-to-elasticsearch my_print & + + # chmod u+x /etc/p2es/triggers/my_print + + /etc/p2es/my_print.conf: + + { + ... + "InputFile": "/var/lib/pmacct/my_print.json" + ... + } + +In the example, the nfacctd daemon has a plugin named **my_print** that writes +its JSON output to **/var/lib/pmacct/my_print.json** and, when done, executes +the **/etc/p2es/triggers/my_print** program. The trigger program, in turn, runs +pmacct-to-elasticsearch with the **my_print** argument and detaches it. +The **my_print.conf** file contains the "InputFile" configuration key that points +to the aforementioned JSON output file (**/var/lib/pmacct/my_print.json**), where +the program will read data from. + +The trigger program may also be a symbolic link to the **default_trigger** script +provided, which runs pmacct-to-elasticsearch with its own file name as first +argument: + + # cd /etc/p2es/triggers/ + # ln -s default_trigger my_print + + /etc/p2es/triggers/default_trigger: + + #!/bin/sh + PLUGIN_NAME=`basename $0` + /usr/local/bin/pmacct-to-elasticsearch $PLUGIN_NAME & + +Otherwise, remember to use the full path of pmacct-to-elasticsearch in order +to avoid problems with a stripped version of the *PATH* environment variable. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..28e0147 --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2014 Pier Carlo Chiodi + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..c44f9fb --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +pmacct-to-elasticsearch +======================= + +**pmacct-to-elasticsearch** is a python script designed to read JSON output from **pmacct** daemons, to process it and to store it into **ElasticSearch**. It works with both *memory* and *print* plugins and, optionally, it can perform **manipulations on data** (such as to add fields on the basis of other values). + +![Data flow](https://raw.github.com/pierky/pmacct-to-elasticsearch/master/img/data_flow.png) + +1. **pmacct daemons** collect IP accounting data and process them with their plugins; +2. data are stored into **in-memory-tables** (*memory* plugins) or **JSON files** (*print* plugins); +3. **crontab jobs** (*memory* plugins) or **trigger scripts** (*print* plugins) are invoked to execute pmacct-to-elasticsearch; +4. JSON records are finally processed by **pmacct-to-elasticsearch**, which reads them from stdin (*memory* plugins) or directly from JSON file. + +Optionally, some **data transformations** can be configured, to allow pmacct-to-elasticsearch to **add or remove fields** to/from the output documents that are sent to ElasticSearch for indexing. These additional fields may be useful to enhance graphs and reports legibility, or to add a further level of aggregation or filtering. + +## Installation + +Clone the repository and run the ./install script: + + # cd /usr/local/src/ + # git clone https://github.com/pierky/pmacct-to-elasticsearch.git + # cd pmacct-to-elasticsearch/ + # ./install + +## Configuration + +Please refer to the [CONFIGURATION.md](CONFIGURATION.md) file. The [TRANSFORMATIONS.md](TRANSFORMATIONS.md) file contains details about data transformations configuration. + +A simple tutorial on pmacct integration with ElasticSearch/Kibana using pmacct-to-elasticsearch can be found at http://blog.pierky.com/integration-of-pmacct-with-elasticsearch-and-kibana. + +## Author + +Pier Carlo Chiodi - http://pierky.com/aboutme + +Blog: http://blog.pierky.com Twitter: [@pierky](http://twitter.com/pierky) diff --git a/TRANSFORMATIONS.md b/TRANSFORMATIONS.md new file mode 100644 index 0000000..5823e6c --- /dev/null +++ b/TRANSFORMATIONS.md @@ -0,0 +1,219 @@ +BE WARNED: Transformations and Conditions are highly experimental, test them +before using them in a production system. + +Transofrmations allow pmacct-to-elasticsearch to analyze input data and add +additional information to the output that it sends to ElasticSearch. These +additional fields may be useful to enhance graphs and reports legibility, or +to add a further level of aggregation or filtering. + +For example: given an input set of data reporting the ingress router and +interface, a transformation matrix allow to add a new field with the peer's +friendly name: + + pmacct output = pmacct-to-elasticsearch input: + { "peer_ip_src": "10.0.0.1", "iface_in": 10, "packets": 2, "bytes": 100 } + + pmacct-to-elasticsearch output = ElasticSearch indexed document: + { "peer_ip_src": "10.0.0.1", "iface_in": 10, "packets": 2, "bytes": 100, "peer_name": "MyUpstreamProvider" } + +Transofrmations are based on **Conditions** and **Actions**: if a condition is +satisfied then all its actions are performed. + +For each record received from pmacct, pmacct-to-elasticsearch verifies if its +fields match one or more conditions and, in case, performs the related actions. + +## Syntax + +Syntax is JSON based and refers to the "Transformations" key referenced in the +[CONFIGURATION.md](CONFIGURATION.md) file: + + { + ... + "Transformations": [ + { + "Conditions": , + "Actions": + }, + { + "Conditions": , + "Actions": + }, + { + ... + } + ] + ... + } + +### Conditions + +- **Conditions** = `[ ( "AND"|"OR" ), , , ]` + + If omitted, "AND" is used. + +- **Criteria** = ` | { "": (, "__op__": "[ = | < | <= | > | >= | != | in | notin ] " ) }` + + If omitted, operator is "=" (equal). + For "in" and "notin" operators, a list is expected as *value*: + + { "field": [ "a", "b" ], "__op__": "in" } + +Examples: + + Bob, older than 15: + [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ] + + Bob or Tom: + [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ] + + Bob, only if he's older than 15, otherwise Tom or Lisa, only if she's older than 20 + + [ "OR", [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], { "Name": "Tom" }, [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] ] + +### Actions + +- **Actions** = ```[ { "Type": "", }, + { "Type": "", }, + { ... } ]``` + +- **ActionType** == "**AddField**", action's details = + + "Name": "", + "Value": "" + + Sets the "*destination_field_name*" field to "*new_value*"; if + "*destination_field_name*" field does not exist, creates it. + + Macros can be used in "*new_value*". + +- **ActionType** == "**AddFieldLookup**", action's details = + + "Name": "", + "LookupFieldName": "", + "LookupTable": { + "": "", + "": "", + "": "" + } + "LookupTableFile": "" + + If "*key_field*" field is present in the input, searches the lookup + table for "*key_field*" value and, eventually, sets + "*destination_field_name*" field to the "*new_value*" found. + If "*key_field*" is not present in the input dataset but a "*" key is + present in the lookup table then its value is used to set + "*destination_field_name*" field. + + The lookup table can be written directly in the configuration file + (using "LookupTable" key) or referenced as an external file + ("LookupTableFile" key). + + Macros can be used in "*new_value*". + +- **ActionType** == "**DelField**", action's details = + + "Name": "" + + If "*field_name*" is present in the output dataset, it is removed. + +## Macros + +Macros can be used to refer to fields already present in the output dataset; +their syntax is $fieldname. + +## Examples + +1. Add peer's friendly name to ingress traffic: + + { ... + "Transformations": [ + { + "Conditions": [ { "peer_ip_src": "10.0.0.1" }, { "iface_in": 1 } ], + "Actions": [ { "Type": "AddField", "Name": "peer_name", "Value": "MyUpstream1" } ] + }, + { + "Conditions": [ { "peer_ip_src": "192.168.0.1" }, { "iface_in": 10 } ], + "Actions": [ { "Type": "AddField", "Name": "peer_name", "Value": "MyUpstream2" } ] + } + ] + ... } + + If "peer_ip_src" = "10.0.0.1" and "iface_in" = 1, set "peer_name" to + "MyUpstream1". Similar for the second condition. + + input: + { "peer_ip_src": "10.0.0.1", "iface_in": 1, "packets": 2, "bytes": 100 } + { "peer_ip_src": "192.168.0.1", "iface_in": 10, "packets": 4, "bytes": 400 } + output: + { "peer_ip_src": "10.0.0.1", "iface_in": 1, "packets": 2, "bytes": 100, "peer_name": "MyUpstream1" } + { "peer_ip_src": "192.168.0.1", "iface_in": 10, "packets": 4, "bytes": 400, "peer_name": "MyUpstream2" } + +2. Add Autonomous System name to source AS: + + { ... + "Transformations": [ + "Conditions": [ { "as_src": "", "__op__": "!=" } ], + "Actions": [ + { "Type": "AddFieldLookup", "Name": "as_src_name", + "LookupFieldName": "as_src", + "LookupTableFile": "/etc/p2es/AS_map.json" + ] + ] + ... } + + + /etc/p2es/AS_map.json: + + { + "36040": "$as_src - YouTube", + "15169": "$as_src - Google", + "20940": "$as_src - Akamai", + "*": "$as_src" + } + + If "as_src" is not empty, use its value to lookup the table in + **/etc/p2es/AS_map.json**; if a corresponding value is found, use it to fill the + new "as_src_name" field with "ASN - Name" values, otherwise fill if with + only the ASN. + + input: + { "as_src": 36040, "packets": 1, "flows": 1, "bytes": 100 } + { "as_src": 20940, "packets": 5, "flows": 5, "bytes": 500 } + { "as_src": 32934, "packets": 8, "flows": 4, "bytes": 300 } + output: + { "as_src": 36040, "packets": 1, "flows": 1, "bytes": 100, "as_src_name": "36040 - YouTube" } + { "as_src": 20940, "packets": 5, "flows": 5, "bytes": 500, "as_src_name": "20940 - Akamai" } + { "as_src": 32934, "packets": 8, "flows": 4, "bytes": 300, "as_src_name": "32934" } + +3. Another version of example 1: add peer's friendly name to ingress traffic: + + + { ... + "Transformations": [ + { + "Conditions": [ "AND", { "peer_ip_src": "", "__op__": "!=" }, { "iface_in": "", "__op__": "!=" } ], + "Action": [ { "Type": "AddField", "Name": "temporary1", "Value": "$peer_ip_src-$iface_in" } ] + }, + { + "Conditions": [ { "temporary1": "", "__op__": "!=" } ], + "Actions": [ + { + "Type": "AddFieldLookup", + "Name": "peer_name", + "LookupFieldName": "temporary1", + "LookupTable": { + "10.0.0.1-1": "MyUpstream1", + "192.168.0.1-10": "MyUpstream2" + } + }, + { "Type": "DelField", "Name": "temporary1" } + ] + } + ] + ... } + + If "peer_ip_src" and "iface_in" are not empty, add a new temporary field named + "temporary1" with "*peer_ip_src*-*iface_in*". Next, if "temporary1" field has + been filled and it's not empty, use it to lookup the table in order to find the + corresponding peer's friendly name. Finally, remove the temporary field from + the output dataset. diff --git a/distrib/cron b/distrib/cron new file mode 100644 index 0000000..8778016 --- /dev/null +++ b/distrib/cron @@ -0,0 +1,14 @@ +# /etc/cron.d/pmacct-to-elasticsearch: crontab fragment for pmacct-to-elasticsearch + +SHELL=/bin/sh +PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin: + +# m h dom mon dow user command + +# Example: dump data from pmacct in-memory-table and process them with pmacct-to-elasticsearch +# every 5 minutes +# +#*/5 * * * * root pmacct -l -p /var/spool/pmacct/my_plugin.pipe -s -O json -e | pmacct-to-elasticsearch my_plugin +# + +#EOF diff --git a/distrib/default_trigger b/distrib/default_trigger new file mode 100755 index 0000000..45f59be --- /dev/null +++ b/distrib/default_trigger @@ -0,0 +1,21 @@ +#!/bin/sh + +# Default trigger to run pmacct-to-elasticsearch from +# pmacct "print_trigger_exec" configuration key. +# +# You can copy and edit this file or create a link to +# it in the format: +# +# -> ./default_trigger +# +# Example: +# +# # cd /etc/p2es/triggers/ +# # ln -s default_trigger plugin2 +# # ls -l +# total 4 +# -rwxr--r-- 1 root root 90 Dec 15 20:20 default_trigger +# lrwxrwxrwx 1 root root 15 Dec 15 20:21 plugin2 -> default_trigger + +PLUGIN_NAME=`basename $0` +/pmacct-to-elasticsearch $PLUGIN_NAME & diff --git a/distrib/new-index-template.json b/distrib/new-index-template.json new file mode 100644 index 0000000..28eef28 --- /dev/null +++ b/distrib/new-index-template.json @@ -0,0 +1,26 @@ +{ + "mappings": { + "_default_": { + "_all" : { "enabled" : false }, + + "_source" : { "enabled" : true }, + + "dynamic_templates": [ + { + "string_template" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping": { "type": "string", "index": "not_analyzed" } + } + } + ], + + "properties": { + "@timestamp": { "type": "date" }, + "bytes": { "type": "long" }, + "packets": { "type": "long" }, + "flows": { "type": "long" } + } + } + } +} diff --git a/img/config_files.png b/img/config_files.png new file mode 100644 index 0000000..2a6313f Binary files /dev/null and b/img/config_files.png differ diff --git a/img/data_flow.png b/img/data_flow.png new file mode 100644 index 0000000..eb7a45b Binary files /dev/null and b/img/data_flow.png differ diff --git a/install b/install new file mode 100755 index 0000000..bee3a65 --- /dev/null +++ b/install @@ -0,0 +1,136 @@ +#!/bin/bash + +# if you change the following CONF_DIR value you must +# change it in pmacct-to-elasticsearch too. +CONF_DIR=/etc/p2es + +BIN_DIR=/usr/local/bin + +echo "" +echo "pmacct-to-elasticsearch installation script" +echo "" +echo "The following directories will be used for the installation:" +echo " - $CONF_DIR: configuration files" +echo " - $BIN_DIR: the main pmacct-to-elasticsearch program" +echo "" +read -p "Proceed [yes|NO]: " PROCEED + +if [ "$PROCEED" != "yes" ]; then + echo "Installation aborted" + exit 0 +fi + +# ----------------------------------------------------------- + +echo -n "Creating configuration directory ($CONF_DIR)... " + +if [ ! -d $CONF_DIR ]; then + + mkdir -p $CONF_DIR &>/dev/null + + if [ $? -ne 0 ]; then + echo "ERROR - exit code $?" + exit 1 + else + mkdir $CONF_DIR/triggers &>/dev/null + + if [ $? -ne 0 ]; then + echo "ERROR creating triggers subdirectory - exit code $?" + exit 1 + else + echo "OK" + fi + fi +else + echo "OK (already exists)" +fi + +# ----------------------------------------------------------- + +echo -n "Copying default configuration files in $CONF_DIR... " + +cp -i distrib/new-index-template.json $CONF_DIR + +if [ $? -ne 0 ]; then + echo "ERROR - exit code $?" + exit 1 +else + echo "OK" +fi + +# ----------------------------------------------------------- + +echo -n "Copying default trigger in $CONF_DIR/triggers... " + +if [ -f $CONF_DIR/triggers/default_trigger ]; then + echo "" + read -p "- Default trigger already exists at $CONF_DIR/triggers/default_trigger: overwrite it? [yes|NO] " PROCEED +else + PROCEED="yes" +fi + +if [ "$PROCEED" == "yes" ]; then + BIN_DIR_SED="${BIN_DIR//\//\\/}" + sed -e "s//$BIN_DIR_SED/" distrib/default_trigger > $CONF_DIR/triggers/default_trigger + + if [ $? -ne 0 ]; then + echo "ERROR - exit code $?" + exit 1 + else + echo "OK" + chmod u+x $CONF_DIR/triggers/default_trigger + fi +else + echo "Skipped" +fi + +# ----------------------------------------------------------- + +echo -n "Copying pmacct-to-elasticsearch program to $BIN_DIR... " + +cp pmacct-to-elasticsearch $BIN_DIR/ + +if [ $? -ne 0 ]; then + echo "ERROR - exit code $?" + exit 1 +else + echo "OK" + chmox u+x $BIN_DIR/pmacct-to-elasticsearch +fi + +# ----------------------------------------------------------- + +echo -n "Copying crontab job fragment to /etc/cron.d... " + +if [ -d /etc/cron.d ]; then + if [ ! -f /etc/cron.d/pmacct-to-elasticsearch ]; then + BIN_DIR_SED="${BIN_DIR//\//\\/}" + sed -e "s//$BIN_DIR_SED/" distrib/cron > /etc/cron.d/pmacct-to-elasticsearch + + if [ $? -ne 0 ]; then + echo "ERROR - exit code $?" + exit 1 + else + echo "OK" + fi + else + echo "skipped (/etc/cron.d/pmacct-to-elasticsearch already exists)" + fi +else + echo "ERROR - /etc/cron.d does not exist" + exit 1 +fi + +# ----------------------------------------------------------- + +echo "=====================" +echo "Installation complete" +echo "=====================" +echo "" + +if [ "$CONF_DIR" != "/etc/p2es" ]; then + echo "WARNING: the default configuration directory (CONF_DIR) has been changed from /etc/p2es to $CONF_DIR: please change the CONF_DIR variable in pmacct-to-elasticsearch too." +fi + +echo "Some configurations are needed now. Read the CONFIGURATION.md file for more details." +echo "" diff --git a/pmacct-to-elasticsearch b/pmacct-to-elasticsearch new file mode 100755 index 0000000..021d1e2 --- /dev/null +++ b/pmacct-to-elasticsearch @@ -0,0 +1,665 @@ +#!/usr/bin/env python + +# Copyright (c) 2014 Pier Carlo Chiodi - http://www.pierky.com +# Licensed under The MIT License (MIT) - http://opensource.org/licenses/MIT + +import getopt +import logging +from logging.handlers import RotatingFileHandler +from copy import deepcopy +import fileinput +import select +import sys +import os.path +import json +import datetime +import urllib2 + +APP_NAME = 'pmacct-to-elasticsearch' +CURRENT_RELEASE = 'v0.1.0' + +CONF_DIR = '/etc/p2es' + +EXITCODE_OK = 0 +EXITCODE_OneOrMoreErrors= 1 + +EXITCODE_Program = 2 +EXITCODE_ElasticSearch = 3 + +DEF_CONFIG = { + 'LogFile': '/var/log/%s-$PluginName.log' % APP_NAME, + + 'ES_URL': 'http://localhost:9200', + 'ES_IndexName': '', + 'ES_Type': '', + + 'ES_FlushSize': 5000, + + 'InputFile': None, + + 'Transformations': [] +} + +CONFIG = DEF_CONFIG.copy() + +def ExpandMacro( In ): + if In is None: + return None + + Out = deepcopy( In ) + + Out = Out.replace( '$PluginName', CONFIG.get( 'PluginName' ) or 'default' ) + Out = Out.replace( '$IndexName', datetime.datetime.now().strftime( CONFIG.get( 'ES_IndexName' ) or 'default' ) ) + Out = Out.replace( '$Type', CONFIG.get( 'ES_Type' ) or 'default' ) + return Out + +# returns True or False +def SetupLogging( BaseLogFile=None ): + if BaseLogFile: + LogFilePath = ExpandMacro( BaseLogFile ) + else: + LogFilePath = None + + logger = logging.getLogger(APP_NAME) + + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + + logger.setLevel(logging.INFO) + + logger.handlers = [] + + if LogFilePath: + # Log to stdout also + if sys.stdout.isatty(): + try: + hdlr = logging.StreamHandler( sys.stdout ) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + pass + + try: + hdlr = logging.handlers.RotatingFileHandler( LogFilePath, maxBytes=1000000, backupCount=3 ) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + Log( logging.ERROR, 'Can\'t setup logging to file %s. Ensure it has write permissions for the current user.' % LogFilePath ) + return False + + else: + try: + hdlr = logging.StreamHandler( sys.stderr ) + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + except: + sys.stderr.write( 'Can\'t setup logging to stderr.\n' ) + return False + + return True + +def Log( lev, msg, exc_info=False ): + logger = logging.getLogger(APP_NAME) + logger.log( lev, msg, exc_info=exc_info ) + +def Usage(): + print('') + print('%s %s' % ( APP_NAME, CURRENT_RELEASE ) ) + print('by Pier Carlo Chiodi (aka Pierky)') + print('http://www.pierky.com/%s' % APP_NAME ) + print('') + print('Usage: %s [ options ] ' % APP_NAME) + print('') + print('Options:') + print(' -p, --print: only print output to stdout (does not send data to ElasticSearch)') + print(' -t, --test: only tests configuration (does not send data to ElasticSearch)') + print(' -h, --help: show this help') + print('') + print('Copyright (c) 2014 Pier Carlo Chiodi') + print('') + +def ExpandDataMacros( S, Dict ): + if S.find('$') >= 0: + Res = S + + for K in Dict.keys(): + Res = Res.replace( '$%s' % K, str( Dict[K] ) ) + return Res + else: + return S + +# See TRANSFORMATIONS.md file for details +def ParseConditions( C, D, OpField='__op__' ): + if isinstance( C, list ): + if len(C) > 0: + if isinstance( C[0], basestring ): + if C[0] == 'AND': + if len(C) > 2: + for subC in C[1:]: + if not ParseConditions( subC, D ): + return False + return True + else: + return False + + elif C[0] == 'OR': + if len(C) > 2: + for subC in C[1:]: + if ParseConditions( subC, D ): + return True + return False + else: + return True + + else: + raise Exception( 'Logical groups must begin with "AND" or "OR" ("%s" found)' % C[0] ) + else: + # default to "AND" if not specified + + for subC in C: + if not ParseConditions( subC, D ): + return False + return True + + else: + raise Exception( 'Empty list' ) + + elif isinstance( C, dict ): + Op = '=' + N = None + V = None + + for K in C.keys(): + if K == OpField: + Op = C[K] + + if not Op in [ '=', '>', '>=', '<', '<=', '!=', 'in', 'notin' ]: + raise Exception( 'Unexpected operator: "%s"' % Op ) + else: + if N is None: + N = K + V = C[K] + else: + raise Exception( 'Only one name/value pair allowed' ) + + if Op in [ 'in', 'notin' ] and not isinstance( V, list ): + raise Exception( 'The "%s" operator requires a list' % Op ) + + if not N is None: + if not N in D: + return False + + if Op == '=': + return D[N] == V + elif Op == '>': + return D[N] > V + elif Op == '>=': + return D[N] >= V + elif Op == '<': + return D[N] < V + elif Op == '<=': + return D[N] <= V + elif Op == '!=': + return D[N] != V + elif Op == 'in': + return D[N] in V + elif Op == 'notin': + return not D[N] in V + else: + raise Exception( 'Operator not implemented: "%s"' % Op ) + + else: + raise Exception( 'Name/value pair expected' ) + else: + raise Exception( 'Unexpected object type %s from %s' % ( type(C), str(C) ) ) + +# returns True or False +def TestTransformation( Transformation ): + Ret = True + + try: + TransformationDetails = 'Transformations matrix (%s)' % str( Transformation ) + except: + TransformationDetails = 'Transformations matrix' + + if not 'Conditions' in Transformation: + Log( logging.ERROR, '%s, "Conditions" is missing' % TransformationDetails ) + Ret = False + else: + try: + ParseConditions( Transformation['Conditions'], {} ) + except Exception as e: + Log( logging.ERROR, '%s, invalid "Conditions": %s' % ( TransformationDetails, str(e) ) ) + Ret = False + + if not 'Actions' in Transformation: + Log( logging.ERROR, '%s, "Actions is missing' % TransformationDetails ) + Ret = False + else: + for Action in Transformation['Actions']: + if not 'Type' in Action: + Log( logging.ERROR, '%s, "Type" is missing' % TransformationDetails ) + Ret = False + else: + TransformationDetails = TransformationDetails + ', action type = %s' % Action['Type'] + + if not Action['Type'] in [ 'AddField', 'AddFieldLookup', 'DelField' ]: + Log( logging.ERROR, '%s, "Type" unknown' % TransformationDetails ) + Ret = False + else: + if Action['Type'] in [ 'AddField', 'AddFieldLookup', 'DelField' ]: + if not 'Name' in Action: + Log( logging.ERROR, '%s, "Name" is missing' % TransformationDetails ) + Ret = False + + if Action['Type'] in [ 'AddField' ]: + if not 'Value' in Action: + Log( logging.ERROR, '%s, "Value" is missing for new field "%s"' % ( TransformationDetails, Action['Name'] ) ) + Ret = False + + if Action['Type'] in [ 'AddFieldLookup' ]: + if not 'LookupFieldName' in Action: + Log( logging.ERROR, '%s, "LookupFieldName" is missing for new field "%s"' % ( TransformationDetails, Action['Name'] ) ) + Ret = False + if 'LookupTable' in Action and 'LookupTableFile' in Action: + Log( logging.ERROR, '%s, only one from "LookupTable" and "LookupTableFile" allowed' % TransformationDetails ) + Ret = False + if not 'LookupTable' in Action and not 'LookupTableFile' in Action: + Log( logging.ERROR, '%s, "LookupTable" and "LookupTableFile" missing for new field "%s"' % ( TransformationDetails, Action['Name'] ) ) + Ret = False + else: + if 'LookupTableFile' in Action: + try: + LookupTable_File = open( Action['LookupTableFile'] ) + Action['LookupTable'] = json.load( LookupTable_File ) + LookupTable_File.close() + except: + Log( logging.ERROR, '%s, error loading lookup table from %s' % ( TransformationDetails, Action['LookupTableFile'] ) ) + Ret = False + + return Ret + +# returns exit code (EXITCODE_ElasticSearch, EXITCODE_OK or EXITCODE_OneOrMoreErrors) +def SendToElasticSearch( IndexName, Output ): + # HTTP bulk insert toward ES + + URL = '%s/%s/%s/_bulk' % ( CONFIG['ES_URL'], IndexName, CONFIG['ES_Type'] ) + + try: + HTTPResponse = urllib2.urlopen( URL, Output ) + except: + Log( logging.ERROR, 'Error while executing HTTP bulk insert on %s' % IndexName, exc_info=True ) + return EXITCODE_ElasticSearch + + # Interpreting HTTP bulk insert response + + HTTPPlainTextResponse = HTTPResponse.read() + + if( HTTPResponse.getcode() != 200 ): + Log( logging.ERROR, 'Bulk insert on %s failed - HTTP status code = %s - Response %s' % ( IndexName, HTTPResponse.getcode(), HTTPPlainTextResponse ) ) + return EXITCODE_ElasticSearch + + try: + JSONResponse = json.loads( HTTPPlainTextResponse ) + except: + Log( logging.ERROR, 'Error while decoding JSON HTTP response - first 100 characters: %s' % HTTPPlainTextResponse[:100], exc_info=True ) + return EXITCODE_ElasticSearch + + if JSONResponse['errors']: + Log( logging.WARNING, 'Bulk insert on %s failed to process one or more documents' % IndexName ) + return EXITCODE_OneOrMoreErrors + else: + return EXITCODE_OK + +# returns True or False +def CheckConfiguration(): + if not CONFIG['ES_IndexName']: + Log( logging.ERROR, 'ElasticSearch index name not provided') + return False + + if not CONFIG['ES_Type']: + Log( logging.ERROR, 'ElasticSearch type not provided') + return False + + if not CONFIG['ES_URL']: + Log( logging.ERROR, 'ElasticSearch URL not provided') + return False + + if not 'ES_IndexTemplateFileName' in CONFIG: + CONFIG['ES_IndexTemplateFileName'] = 'new-index-template.json' + else: + IndexTemplatePath = '%s/%s' % ( CONF_DIR, CONFIG['ES_IndexTemplateFileName'] ) + + if not os.path.isfile( IndexTemplatePath ): + Log( logging.ERROR, 'Can\'t find index template file %s' % IndexTemplatePath ) + return False + else: + try: + IndexTemplate_File = open( IndexTemplatePath ) + IndexTemplate = json.load( IndexTemplate_File ) + IndexTemplate_File.close() + except: + Log( logging.ERROR, 'Index template from %s is not in valid JSON format' % ( IndexTemplatePath ), exc_info=True ) + return EXITCODE_Program + + if CONFIG['ES_URL'].endswith('/'): + CONFIG['ES_URL'] = CONFIG['ES_URL'][:-1] + + if not 'ES_FlushSize' in CONFIG: + Log( logging.ERROR, 'Flush size not provided' ) + return False + else: + try: + CONFIG['ES_FlushSize'] = int(CONFIG['ES_FlushSize']) + except: + Log( logging.ERROR, 'Flush size must be a positive integer' ) + return False + + if CONFIG['ES_FlushSize'] < 0: + Log( logging.ERROR, 'Flush size must be a positive integer' ) + return False + + TransformationsOK = True + + if 'Transformations' in CONFIG: + for Transformation in CONFIG['Transformations']: + TransformationsOK = TransformationsOK and TestTransformation( Transformation ) + + if not TransformationsOK: + return False + + return True + +# return exit code (EXITCODE_OK, EXITCODE_ElasticSearch ) and True|False +def DoesIndexExist( IndexName ): + URL = '%s/%s' % ( CONFIG['ES_URL'], IndexName ) + + try: + HEADRequest = urllib2.Request(URL) + HEADRequest.get_method = lambda : 'HEAD' + HTTPResponse = urllib2.urlopen( HEADRequest ) + except urllib2.HTTPError, err: + if err.code == 404: + return EXITCODE_OK, False + else: + Log( logging.ERROR, 'Error while checking if %s index exists' % IndexName, exc_info=True ) + return EXITCODE_ElasticSearch, None + + return EXITCODE_OK, ( HTTPResponse.getcode() == 200 ) + +# return exit code +def CreateIndex( IndexName ): + + # index already exists? + + ExitCode, IndexExists = DoesIndexExist( IndexName ) + + if ExitCode != EXITCODE_OK: + return ExitCode + + if IndexExists: + return EXITCODE_OK + + # index does not exist, creating it + + TemplatePath = '%s/%s' % ( CONF_DIR, CONFIG['ES_IndexTemplateFileName'] ) + + try: + TemplateFile = open( TemplatePath, 'r' ) + Template = TemplateFile.read() + TemplateFile.close() + except: + Log( logging.ERROR, 'Error while reading index template from file %s' % TemplatePath, exc_info=True ) + return EXITCODE_Program + + URL = '%s/%s' % ( CONFIG['ES_URL'], IndexName ) + + try: + HTTPResponse = urllib2.urlopen( URL, Template ) + except: + # something went wrong but index now exists anyway? + + ExitCode, IndexExists = DoesIndexExist( IndexName ) + + if IndexExists: + return EXITCODE_OK + else: + Log( logging.ERROR, 'Error while creating index %s from template %s' % ( IndexName, TemplatePath ), exc_info=True ) + return EXITCODE_ElasticSearch + + return EXITCODE_OK + +# return exit code +def Main(): + if not SetupLogging(): + return EXITCODE_Program + + # Parsing command line arguments + + try: + opts, args = getopt.getopt( sys.argv[1:], 'pth', [ 'print', 'test', 'help', 'test-condition=', 'test-condition-data=' ] ) + + except getopt.GetoptError as err: + Log( logging.ERROR, str(err) ) + Usage() + return EXITCODE_Program + + TestOnly = False + PrintOnly = False + TestCondition = None + TestConditionData = {} + + for o, a in opts: + if o in ( '-h', '--help' ): + Usage() + return EXITCODE_OK + + elif o in ( '-t', '--test' ): + TestOnly = True + + elif o in ( '-p', '--print' ): + PrintOnly = True + + elif o in ( '--test-condition' ): + try: + TestCondition = json.loads( a ) + TestOnly = True + except: + Log( logging.ERROR, 'Invalid JSON object for %s option' % o ) + return EXITCODE_Program + + elif o in ( '--test-condition-data' ): + try: + TestConditionData = json.loads( a ) + TestOnly = True + except: + Log( logging.ERROR, 'Invalid JSON object for %s option' % o ) + return EXITCODE_Program + + if TestCondition: + print( "Tested condition evaluated to %s" % ParseConditions( TestCondition, TestConditionData ) ) + return EXITCODE_OK + + if args == []: + Log( logging.ERROR, 'Missing required argument: ' ) + Usage() + return EXITCODE_Program + + if len(args) > 1: + Log( logging.ERROR, 'Unexpected arguments: %s' % ' '.join( args[1:] ) ) + Usage() + return EXITCODE_Program + + CONFIG['PluginName'] = args[0] + + # Loading configuration + + NewConfigFileName = '%s.conf' % CONFIG['PluginName'] + + try: + NewConfig_File = open( '%s/%s' % ( CONF_DIR, NewConfigFileName ) ) + NewConfig = json.load( NewConfig_File ) + NewConfig_File.close() + except: + Log( logging.ERROR, 'Error loading configuration from %s/%s.conf' % ( CONF_DIR, NewConfigFileName ), exc_info=True ) + return EXITCODE_Program + + CONFIG.update( NewConfig ) + + if 'LogFile' in CONFIG: + if not SetupLogging( CONFIG['LogFile'] ): + return EXITCODE_Program + else: + Log( logging.ERROR, 'Missing LogFile' ) + return EXITCODE_Program + + # Checking configuration + + if not CheckConfiguration(): + return EXITCODE_Program + + if TestOnly: + print('Configuration tested successfully') + return EXITCODE_OK + + if not CONFIG['InputFile']: + r, w, x = select.select([sys.stdin], [], [], 0) + if not r: + Log( logging.ERROR, 'Error while reading input data from stdin' ) + return EXITCODE_Program + + # Creating index + + IndexName = datetime.datetime.now().strftime( CONFIG['ES_IndexName'] ) + + ExitCode = CreateIndex( IndexName ) + + if ExitCode != EXITCODE_OK: + return ExitCode + + # Timestamp for ES indexing (UTC) + + TS = datetime.datetime.utcnow().strftime( '%Y-%m-%dT%H:%M:%SZ' ) + + # Read pmacct's JSON output and perform transformations + + Output = '' + Count = 0 + ExitCode = EXITCODE_OK + + if CONFIG['InputFile']: + InputFile = ExpandMacro( CONFIG['InputFile'] ) + else: + InputFile = '-' + + try: + for line in fileinput.input( InputFile, mode='rU' ): + try: + JSONData = json.loads( line ) + except: + Log( logging.ERROR, 'Error while decoding pmacct\'s JSON output: %s' % line ) + break + + JSONData['@timestamp'] = TS + + try: + if 'Transformations' in CONFIG: + for Transformation in CONFIG['Transformations']: + if ParseConditions( Transformation['Conditions'], JSONData ): + for Action in Transformation['Actions']: + Action_Type = Action['Type'] + + if Action_Type == 'AddField': + NewVal = ExpandDataMacros( Action['Value'], JSONData ) + JSONData[ Action['Name'] ] = NewVal + + elif Action_Type == 'AddFieldLookup': + if Action['LookupFieldName'] in JSONData: + NewVal = None + + if str(JSONData[ Action['LookupFieldName'] ]) in Action['LookupTable']: + NewVal = Action['LookupTable'][ str(JSONData[ Action['LookupFieldName'] ]) ] + else: + if "*" in Action['LookupTable']: + NewVal = Action['LookupTable']['*'] + + if NewVal: + JSONData[ Action['Name'] ] = ExpandDataMacros( NewVal, JSONData ) + + elif Action_Type == 'DelField': + if Action['Name'] in JSONData: + del JSONData[ Action['Name'] ] + except: + Log( logging.ERROR, 'Error while applying transformations to pmacct\'s JSON output: %s' % line, exc_info=True ) + break + + Output = Output + '{"index":{}}' + '\n' + Output = Output + json.dumps( JSONData ) + '\n' + + if PrintOnly: + print(Output) + Output = '' + else: + Count = Count + 1 + if CONFIG['ES_FlushSize'] > 0 and Count >= CONFIG['ES_FlushSize']: + Output = Output + '\n' + ES_ExitCode = SendToElasticSearch( IndexName, Output ) + Output = '' + Count = 0 + + if ES_ExitCode == EXITCODE_ElasticSearch: + return ES_ExitCode + if ES_ExitCode == EXITCODE_OneOrMoreErrors: + ExitCode = EXITCODE_OneOrMoreErrors + + if not PrintOnly and Output != '': + Output = Output + '\n' + ES_ExitCode = SendToElasticSearch( IndexName, Output ) + Output = '' + Count = 0 + + if ES_ExitCode == EXITCODE_ElasticSearch: + return ES_ExitCode + if ES_ExitCode == EXITCODE_OneOrMoreErrors: + ExitCode = EXITCODE_OneOrMoreErrors + + except: + Log( logging.ERROR, 'Error while reading and processing input data from %s' % InputFile, exc_info=True ) + return EXITCODE_Program + + return ExitCode + +if __name__ == '__main__': + try: + RetVal = Main() + except: + Log( logging.ERROR, 'Unhandled exception', exc_info=True ) + RetVal = EXITCODE_Program + sys.exit( RetVal ) + + # Test conditions + # ------------------- + # + #C = [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ] + #C = [ "OR", { "Name": "Bob" }, { "Name": "Tom" } ] + #C = [ "OR", [ { "Name": "Bob" }, { "Age": 16, "__op__": ">=" } ], { "Name": "Tom" }, [ { "Name": "Lisa" }, { "Age": 20, "__op__": ">=" } ] ] + #C = [ "Invalid" ] + # + #Data = [ + # { "Name": "Bob", "Age": 15 }, + # { "Name": "Bob", "Age": 16 }, + # { "Name": "Ken", "Age": 14 }, + # { "Name": "Tom", "Age": 14 }, + # { "Name": "Tom", "Age": 20 }, + # { "Name": "Lisa", "Age": 15 }, + # { "Name": "Lisa", "Age": 22 } + #] + # + #for Person in Data: + # try: + # if ParseConditions( C, Person ): + # print( "YES - %s" % Person ) + # else: + # print( "--- - %s" % Person ) + # except Exception as e: + # print( "ParseConditions error: %s" % str(e) ) +