diff --git a/CHANGES.rst b/CHANGES.rst index 648a105..5fa47b8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,13 @@ Changelog ========= +0.3.0a2 +------- + +- CSV output support. + + The ``InputFormat`` option in the plugin configuration file can be used to instruct pmacct-to-elasticsearch to parse CSV output from pmacct. + 0.3.0a1 ------- diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 721462e..6dc2f3b 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -2,7 +2,7 @@ ## How it works -pmacct-to-elasticsearch reads pmacct JSON output and sends it to ElasticSearch. +pmacct-to-elasticsearch reads pmacct output and sends it to ElasticSearch. It works properly with two kinds of pmacct plugins: "memory" and "print". The former, "memory", needs data to be passed to pmacct-to-elasticsearch's @@ -152,6 +152,10 @@ These files are in JSON format and contain the following keys: should coincide with pmacct's print plugin output file). If omitted pmacct-to-elasticsearch will read data from stdin. +- **InputFormat** [optional]: the input data format. Can be 'json' or 'csv'. + + **Default**: "json" + - **Transformations** [optional]: the transformation matrix used to add new fields to the output document sent to ElasticSearch for indexing. diff --git a/README.rst b/README.rst index 6a2605e..9604410 100644 --- a/README.rst +++ b/README.rst @@ -1,15 +1,15 @@ pmacct-to-elasticsearch ======================= -**pmacct-to-elasticsearch** is a python script designed to read JSON output from **pmacct** daemons, to process it and to store it into **ElasticSearch**. It works with both *memory* and *print* plugins and, optionally, it can perform **manipulations on data** (such as to add fields on the basis of other values). +**pmacct-to-elasticsearch** is a python script designed to read output from **pmacct** daemons, to process it and to store it into **ElasticSearch**. It works with both *memory* and *print* plugins and, optionally, it can perform **manipulations on data** (such as to add fields on the basis of other values). .. image:: img/data_flow.png :align: center 1. **pmacct daemons** collect IP accounting data and process them with their plugins; -2. data are stored into **in-memory-tables** (*memory* plugins) or **JSON files** (*print* plugins); +2. data are stored into **in-memory-tables** (*memory* plugins), **JSON or CSV files** (*print* plugins); 3. **crontab jobs** (*memory* plugins) or **trigger scripts** (*print* plugins) are invoked to execute pmacct-to-elasticsearch; -4. JSON records are finally processed by **pmacct-to-elasticsearch**, which reads them from stdin (*memory* plugins) or directly from JSON file. +4. pmacct's output records are finally processed by **pmacct-to-elasticsearch**, which reads them from stdin (*memory* plugins) or directly from file. Optionally, some **data transformations** can be configured, to allow pmacct-to-elasticsearch to **add or remove fields** to/from the output documents that are sent to ElasticSearch for indexing. These additional fields may be useful to enhance graphs and reports legibility, or to add a further level of aggregation or filtering. @@ -44,8 +44,7 @@ A simple tutorial on pmacct integration with ElasticSearch/Kibana using pmacct-t Future work ----------- -- Add support of more pmacct output formats (CSV, Apache Avro, ...). -- Read input from stdin pipes too. +- Add support of more pmacct output formats (Apache Avro, ...). Author ------ diff --git a/pierky/p2es/version.py b/pierky/p2es/version.py index f9f77c1..2fc0e78 100644 --- a/pierky/p2es/version.py +++ b/pierky/p2es/version.py @@ -1 +1 @@ -__version__ = "0.3.0a1" +__version__ = "0.3.0a2" diff --git a/pierky/p2es/workers.py b/pierky/p2es/workers.py index 2e3d6b1..3ca1dd7 100644 --- a/pierky/p2es/workers.py +++ b/pierky/p2es/workers.py @@ -9,6 +9,7 @@ from errors import P2ESError from es import * from transformations import * +import sys class P2ESThread(threading.Thread): @@ -175,3 +176,86 @@ class JSONReaderThread(BaseReaderThread): def _parse(self, line): return json.loads(line) + +class CSVReaderThread(BaseReaderThread): + + def set_fields(self, headers): + self.headers = headers + + def _parse(self, line): + fields = line.split(",") + dic = {} + for i in range(len(self.headers)): + field_name = self.headers[i].lower() + field_val = fields[i] + dic[field_name] = field_val + return dic + +class BaseReader(object): + + HEADER = False + + def __init__(self, CONFIG, input_file, errors_queue, writer_queue): + self.CONFIG = CONFIG + + self.input_file = input_file + + self.readers = [] + + for thread_idx in range(CONFIG['ReaderThreads']): + self.readers.append( + self.READER_THREAD_CLASS( + "reader{}".format(thread_idx), + CONFIG, + errors_queue, + writer_queue + ) + ) + + def process_first_line(self, line): + pass + + def start_threads(self): + for thread in self.readers: + thread.daemon = True + thread.start() + + def process_input(self): + self.start_threads() + + with open(self.input_file, "r") if self.input_file else sys.stdin as f: + thread_idx = -1 + first_line_processed = False + for line in f: + line = line.strip() + if not line: + continue + + if not first_line_processed and self.HEADER: + self.process_first_line(line) + first_line_processed = True + continue + + thread_idx = (thread_idx + 1) % len(self.readers) + self.readers[thread_idx].queue.put(line) + + def finalize(self): + for thread in self.readers: + thread.queue.put(None) + + for thread in self.readers: + thread.join() + +class JSONReader(BaseReader): + + READER_THREAD_CLASS = JSONReaderThread + +class CSVReader(BaseReader): + + HEADER = True + READER_THREAD_CLASS = CSVReaderThread + + def process_first_line(self, line): + fields = line.split(",") + for thread in self.readers: + thread.set_fields(fields) diff --git a/scripts/pmacct-to-elasticsearch b/scripts/pmacct-to-elasticsearch index 48a4be9..493be87 100755 --- a/scripts/pmacct-to-elasticsearch +++ b/scripts/pmacct-to-elasticsearch @@ -38,6 +38,7 @@ DEF_CONFIG = { 'ES_FlushSize': 5000, 'ReaderThreads': 2, + 'InputFormat': 'json', 'InputFile': None, @@ -147,6 +148,19 @@ def check_config(): log(logging.ERROR, 'Reader threads number must be a positive integer') return False + if not 'InputFormat' in CONFIG: + log(logging.ERROR, 'Input format not provided') + return False + + valid_input_formats = ('json', 'csv') + if CONFIG['InputFormat'] not in valid_input_formats: + log(logging.ERROR, + 'Unknown input format "{}": must be one of {}'.format( + CONFIG['InputFormat'], ", ".join(valid_input_formats) + ) + ) + return False + if 'Transformations' in CONFIG: for tr in CONFIG['Transformations']: try: @@ -308,12 +322,7 @@ def main(): ts = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') - if CONFIG['InputFile']: - input_file = expand_macros(CONFIG['InputFile']) - else: - input_file = None - - # Read pmacct's JSON output and perform transformations + # Read pmacct's output and perform transformations writer_queue = Queue() errors_queue = Queue() @@ -330,61 +339,48 @@ def main(): ts, writer_queue, CONFIG['ES_FlushSize']) + writer.daemon = True + writer.start() except P2ESError as e: - log(logging.ERROR, "{}".format(str(e))) + log(logging.ERROR, str(e)) return False - readers = [] - - for thread_idx in range(CONFIG['ReaderThreads']): - try: - readers.append( - JSONReaderThread( - "reader{}".format(thread_idx), - CONFIG, - errors_queue, - writer_queue - ) - ) - except P2ESError as e: - log(logging.ERROR, str(e)) - return False - - for thread in readers + [writer]: - thread.daemon = True - thread.start() + try: + if CONFIG['InputFormat'] == 'json': + reader_class = JSONReader + elif CONFIG['InputFormat'] == 'csv': + reader_class = CSVReader + + reader = reader_class( + CONFIG, + expand_macros(CONFIG['InputFile']) if CONFIG['InputFile'] else None, + errors_queue, + writer_queue + ) + except P2ESError as e: + log(logging.ERROR, str(e)) + return False ret_code = True try: - with open(input_file, "r") if input_file else sys.stdin as f: - thread_idx = -1 - for line in f: - line = line.strip() - if not line: - continue - thread_idx = (thread_idx + 1) % len(readers) - readers[thread_idx].queue.put(line) - except P2ESError as e: + reader.process_input() + except Exception as e: log(logging.ERROR, - "Error while processing input file: {}".format(str(e))) + "Error while processing input: {}".format(str(e))) ret_code = False finally: - for thread in readers: - thread.queue.put(None) - - for thread in readers: - thread.join() + reader.finalize() - writer.queue.put(None) - writer.join() + writer.queue.put(None) + writer.join() - while True: - try: - err = errors_queue.get(block=False) - ret_code = False - log(logging.ERROR, err) - except Empty: - break + while True: + try: + err = errors_queue.get(block=False) + ret_code = False + log(logging.ERROR, err) + except Empty: + break return ret_code