Skip to content

Commit

Permalink
0.3.0a2 - CSV support
Browse files Browse the repository at this point in the history
  • Loading branch information
pierky committed Jan 24, 2017
1 parent 82b74cf commit 217c8cb
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 57 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

0.3.0a2
-------

- CSV output support.

The ``InputFormat`` option in the plugin configuration file can be used to instruct pmacct-to-elasticsearch to parse CSV output from pmacct.

0.3.0a1
-------

Expand Down
6 changes: 5 additions & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## How it works

pmacct-to-elasticsearch reads pmacct JSON output and sends it to ElasticSearch.
pmacct-to-elasticsearch reads pmacct output and sends it to ElasticSearch.

It works properly with two kinds of pmacct plugins: "memory" and "print".
The former, "memory", needs data to be passed to pmacct-to-elasticsearch's
Expand Down Expand Up @@ -152,6 +152,10 @@ These files are in JSON format and contain the following keys:
should coincide with pmacct's print plugin output file).
If omitted pmacct-to-elasticsearch will read data from stdin.

- **InputFormat** [optional]: the input data format. Can be 'json' or 'csv'.

**Default**: "json"

- **Transformations** [optional]: the transformation matrix used to add new
fields to the output document sent to ElasticSearch for indexing.

Expand Down
9 changes: 4 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
pmacct-to-elasticsearch
=======================

**pmacct-to-elasticsearch** is a python script designed to read JSON output from **pmacct** daemons, to process it and to store it into **ElasticSearch**. It works with both *memory* and *print* plugins and, optionally, it can perform **manipulations on data** (such as to add fields on the basis of other values).
**pmacct-to-elasticsearch** is a python script designed to read output from **pmacct** daemons, to process it and to store it into **ElasticSearch**. It works with both *memory* and *print* plugins and, optionally, it can perform **manipulations on data** (such as to add fields on the basis of other values).

.. image:: img/data_flow.png
:align: center

1. **pmacct daemons** collect IP accounting data and process them with their plugins;
2. data are stored into **in-memory-tables** (*memory* plugins) or **JSON files** (*print* plugins);
2. data are stored into **in-memory-tables** (*memory* plugins), **JSON or CSV files** (*print* plugins);
3. **crontab jobs** (*memory* plugins) or **trigger scripts** (*print* plugins) are invoked to execute pmacct-to-elasticsearch;
4. JSON records are finally processed by **pmacct-to-elasticsearch**, which reads them from stdin (*memory* plugins) or directly from JSON file.
4. pmacct's output records are finally processed by **pmacct-to-elasticsearch**, which reads them from stdin (*memory* plugins) or directly from file.

Optionally, some **data transformations** can be configured, to allow pmacct-to-elasticsearch to **add or remove fields** to/from the output documents that are sent to ElasticSearch for indexing. These additional fields may be useful to enhance graphs and reports legibility, or to add a further level of aggregation or filtering.

Expand Down Expand Up @@ -44,8 +44,7 @@ A simple tutorial on pmacct integration with ElasticSearch/Kibana using pmacct-t
Future work
-----------

- Add support of more pmacct output formats (CSV, Apache Avro, ...).
- Read input from stdin pipes too.
- Add support of more pmacct output formats (Apache Avro, ...).

Author
------
Expand Down
2 changes: 1 addition & 1 deletion pierky/p2es/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.0a1"
__version__ = "0.3.0a2"
84 changes: 84 additions & 0 deletions pierky/p2es/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from errors import P2ESError
from es import *
from transformations import *
import sys

class P2ESThread(threading.Thread):

Expand Down Expand Up @@ -175,3 +176,86 @@ class JSONReaderThread(BaseReaderThread):

def _parse(self, line):
return json.loads(line)

class CSVReaderThread(BaseReaderThread):

def set_fields(self, headers):
self.headers = headers

def _parse(self, line):
fields = line.split(",")
dic = {}
for i in range(len(self.headers)):
field_name = self.headers[i].lower()
field_val = fields[i]
dic[field_name] = field_val
return dic

class BaseReader(object):

HEADER = False

def __init__(self, CONFIG, input_file, errors_queue, writer_queue):
self.CONFIG = CONFIG

self.input_file = input_file

self.readers = []

for thread_idx in range(CONFIG['ReaderThreads']):
self.readers.append(
self.READER_THREAD_CLASS(
"reader{}".format(thread_idx),
CONFIG,
errors_queue,
writer_queue
)
)

def process_first_line(self, line):
pass

def start_threads(self):
for thread in self.readers:
thread.daemon = True
thread.start()

def process_input(self):
self.start_threads()

with open(self.input_file, "r") if self.input_file else sys.stdin as f:
thread_idx = -1
first_line_processed = False
for line in f:
line = line.strip()
if not line:
continue

if not first_line_processed and self.HEADER:
self.process_first_line(line)
first_line_processed = True
continue

thread_idx = (thread_idx + 1) % len(self.readers)
self.readers[thread_idx].queue.put(line)

def finalize(self):
for thread in self.readers:
thread.queue.put(None)

for thread in self.readers:
thread.join()

class JSONReader(BaseReader):

READER_THREAD_CLASS = JSONReaderThread

class CSVReader(BaseReader):

HEADER = True
READER_THREAD_CLASS = CSVReaderThread

def process_first_line(self, line):
fields = line.split(",")
for thread in self.readers:
thread.set_fields(fields)
96 changes: 46 additions & 50 deletions scripts/pmacct-to-elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DEF_CONFIG = {
'ES_FlushSize': 5000,

'ReaderThreads': 2,
'InputFormat': 'json',

'InputFile': None,

Expand Down Expand Up @@ -147,6 +148,19 @@ def check_config():
log(logging.ERROR, 'Reader threads number must be a positive integer')
return False

if not 'InputFormat' in CONFIG:
log(logging.ERROR, 'Input format not provided')
return False

valid_input_formats = ('json', 'csv')
if CONFIG['InputFormat'] not in valid_input_formats:
log(logging.ERROR,
'Unknown input format "{}": must be one of {}'.format(
CONFIG['InputFormat'], ", ".join(valid_input_formats)
)
)
return False

if 'Transformations' in CONFIG:
for tr in CONFIG['Transformations']:
try:
Expand Down Expand Up @@ -308,12 +322,7 @@ def main():

ts = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

if CONFIG['InputFile']:
input_file = expand_macros(CONFIG['InputFile'])
else:
input_file = None

# Read pmacct's JSON output and perform transformations
# Read pmacct's output and perform transformations

writer_queue = Queue()
errors_queue = Queue()
Expand All @@ -330,61 +339,48 @@ def main():
ts,
writer_queue,
CONFIG['ES_FlushSize'])
writer.daemon = True
writer.start()
except P2ESError as e:
log(logging.ERROR, "{}".format(str(e)))
log(logging.ERROR, str(e))
return False

readers = []

for thread_idx in range(CONFIG['ReaderThreads']):
try:
readers.append(
JSONReaderThread(
"reader{}".format(thread_idx),
CONFIG,
errors_queue,
writer_queue
)
)
except P2ESError as e:
log(logging.ERROR, str(e))
return False

for thread in readers + [writer]:
thread.daemon = True
thread.start()
try:
if CONFIG['InputFormat'] == 'json':
reader_class = JSONReader
elif CONFIG['InputFormat'] == 'csv':
reader_class = CSVReader

reader = reader_class(
CONFIG,
expand_macros(CONFIG['InputFile']) if CONFIG['InputFile'] else None,
errors_queue,
writer_queue
)
except P2ESError as e:
log(logging.ERROR, str(e))
return False

ret_code = True
try:
with open(input_file, "r") if input_file else sys.stdin as f:
thread_idx = -1
for line in f:
line = line.strip()
if not line:
continue
thread_idx = (thread_idx + 1) % len(readers)
readers[thread_idx].queue.put(line)
except P2ESError as e:
reader.process_input()
except Exception as e:
log(logging.ERROR,
"Error while processing input file: {}".format(str(e)))
"Error while processing input: {}".format(str(e)))
ret_code = False
finally:
for thread in readers:
thread.queue.put(None)

for thread in readers:
thread.join()
reader.finalize()

writer.queue.put(None)
writer.join()
writer.queue.put(None)
writer.join()

while True:
try:
err = errors_queue.get(block=False)
ret_code = False
log(logging.ERROR, err)
except Empty:
break
while True:
try:
err = errors_queue.get(block=False)
ret_code = False
log(logging.ERROR, err)
except Empty:
break

return ret_code

Expand Down

0 comments on commit 217c8cb

Please sign in to comment.