-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrun_etl.py
110 lines (97 loc) · 2.95 KB
/
run_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import argparse
import tube.settings as config
import tube.enums as enums
import tube.etl.indexers.interpreter as interpreter
import traceback
from tube.importers.sql_to_hdfs import SqlToHDFS
from tube.formatters import BaseFormatter
from tube.utils.spark import make_spark_context
from tube.etl.outputs.es.timestamp import check_to_run_etl
from elasticsearch import Elasticsearch
from py4j.protocol import Py4JJavaError
def run_import():
"""
Define the spark context and parse agruments into config
"""
try:
sql_to_hdfs = SqlToHDFS(config, BaseFormatter())
stream = sql_to_hdfs.generate_import_all_tables()
if stream is None:
return
for line in stream:
print(line.rstrip())
except Exception as ex:
print("ERROR when running import to hadoop")
print(traceback.format_exc())
raise
def run_transform():
try:
sc = make_spark_context(config)
translators = interpreter.create_translators(sc, config)
interpreter.run_transform(translators)
except Py4JJavaError as py4J_ex:
print("ERROR when connecting to spark. Please roll spark")
print(py4J_ex)
print(traceback.format_exc())
except Exception as ex:
print("ERROR when running transformation")
print(ex)
print(traceback.format_exc())
raise
finally:
sc.stop()
def config_by_args():
"""
Define the spark context and parse agruments into config
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
help="The configuration set to run with",
type=str,
choices=enums.RUNNING_MODE_CHOICES,
default="Dev",
)
parser.add_argument(
"-v", "--verbose", help="increase output verbosity", action="store_true"
)
parser.add_argument(
"-s",
"--step",
help="The step to run with",
type=str,
choices=enums.RUNNING_STEP_CHOICES,
default="all",
)
parser.add_argument(
"-f",
"--force",
help="Force ETL run when there is no new data",
action="store_true",
)
args = parser.parse_args()
config.RUNNING_MODE = args.config
return args
def main():
args = config_by_args()
es_hosts = config.ES["es.nodes"]
es_port = int(config.ES["es.port"])
es = Elasticsearch([{"host": es_hosts, "port": es_port, "schema": "http"}])
index_names = interpreter.get_index_names(config)
if args.force or check_to_run_etl(es, index_names):
if (
args.step == enums.RUNNING_STEP_IMPORT
or args.step == enums.RUNNING_STEP_ALL
):
run_import()
if (
args.step == enums.RUNNING_STEP_TRANSFORM
or args.step == enums.RUNNING_STEP_ALL
):
run_transform()
else:
print("Nothing's new")
if __name__ == "__main__":
# Execute Main functionality
main()