Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pierky committed Jan 24, 2017
1 parent 70fa124 commit c017af5
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
5 changes: 3 additions & 2 deletions es.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import urllib2

from errors import P2ESError
Expand Down Expand Up @@ -76,14 +77,14 @@ def does_index_exist(index_name, CONFIG):

# Creates index 'index_name' using template given in config.
# Raises exceptions: yes.
def create_index(index_name, CONF_DIR, CONFIG):
def create_index(index_name, CONFIG):

# index already exists?
if does_index_exist(index_name, CONFIG):
return

# index does not exist, creating it
tpl_path = '{}/{}'.format(CONF_DIR, CONFIG['ES_IndexTemplateFileName'])
tpl_path = '{}/{}'.format(CONFIG['CONF_DIR'], CONFIG['ES_IndexTemplateFileName'])

try:
with open(tpl_path, "r") as f:
Expand Down
2 changes: 1 addition & 1 deletion pmacct-to-elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def main():
ts = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

if CONFIG['InputFile']:
input_file = ExpandMacro(CONFIG['InputFile'])
input_file = expand_macros(CONFIG['InputFile'])
else:
input_file = None

Expand Down
8 changes: 4 additions & 4 deletions transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ def test_transformation(tr):
if 'LookupTableFile' in action:
try:
with open(action['LookupTableFile'], "r") as f:
action['LookupTable'] = json.load(f.read())
except:
action['LookupTable'] = json.load(f)
except Exception as e:
raise P2ESError(
'{}, error loading lookup table from {}'.format(
tr_det, action['LookupTableFile']
'{}, error loading lookup table from {}: {}'.format(
tr_det, action['LookupTableFile'], str(e)
)
)

Expand Down
18 changes: 11 additions & 7 deletions workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ def _format_output(self):
return out

def flush(self):
self._flush(self._format_output())
self.es_docs = []
if self.es_docs:
try:
output = self._format_output()
self._flush(output)
finally:
self.es_docs = []

def run(self):
while True:
Expand All @@ -49,8 +53,9 @@ def run(self):
dic = self.queue.get(block=True, timeout=1)

if dic is None:
self.flush()
return
#self.flush()
break
#return

dic['@timestamp'] = self.ts
self.es_docs.append(dic)
Expand All @@ -60,8 +65,7 @@ def run(self):
pass
except Exception as e:
self.errors_queue.put(str(e))
if dic is None:
return
self.flush()

class ESWriterThread(BaseWriterThread):

Expand All @@ -76,7 +80,7 @@ def __init__(self, *args, **kwargs):
self.CONFIG['ES_IndexName']
)
try:
create_index(self.index_name, self.CONFIG["CONF_DIR"], self.CONFIG)
create_index(self.index_name, self.CONFIG)
except P2ESError as e:
raise P2ESError(
"Error while creating index {}: {}".format(
Expand Down

0 comments on commit c017af5

Please sign in to comment.