forked from ikreymer/webarchive-indexing
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdosample.py
52 lines (37 loc) · 1.35 KB
/
dosample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import logging
import os
import sys
import tempfile
from mrjob.util import log_to_stream
from samplecdxjob import SampleCDXJob
from seqfileutils import make_text_null_seq
SEQ_FILE = 'splits.seq'
SPL_FILE = 'splits.txt'
LOG = logging.getLogger('SampleCDXJob')
LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s"
def run_sample_job():
job = SampleCDXJob(args=sys.argv[1:])
verbose = '--verbose' in sys.argv[1:]
log_to_stream(format=LOG_FORMAT, name='SampleCDXJob', debug=verbose)
log_to_stream(format=LOG_FORMAT, name='mrjob', debug=verbose)
log_to_stream(format=LOG_FORMAT, name='__main__', debug=verbose)
with job.make_runner() as runner:
runner.run()
if os.path.isfile(SEQ_FILE):
os.remove(SEQ_FILE)
if os.path.isfile(SPL_FILE):
os.remove(SPL_FILE)
# dump streaming output to file
with open(SPL_FILE, 'wb') as fh:
for x in runner.cat_output():
fh.write(x)
fh.close()
# convert streaming output to sequence file
count = make_text_null_seq(SEQ_FILE, open(SPL_FILE))
if job.options.splitfile and hasattr(runner.fs, 'make_s3_key'):
key = job.fs.make_s3_key(job.options.splitfile)
key.set_contents_from_filename(SEQ_FILE)
def main():
run_sample_job()
if __name__ == "__main__":
main()