forked from jodaiber/Annotated-WikiExtractor
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathextract_parallel.py
41 lines (33 loc) · 1.23 KB
/
extract_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import sys
import rapidjson as json
from joblib import Parallel, delayed
from tqdm import tqdm
from rubyslippers import extract_pages_from_dump
from rubyslippers import WikiExtractor
def parallelize_preprocess(func, iterator, processes, progress_bar=False):
iterator = tqdm(iterator) if progress_bar else iterator
if processes <= 1:
return map(func, iterator)
return Parallel(n_jobs=processes)(delayed(func)(line) for line in iterator)
processes = 4
quiet = True
we = WikiExtractor('en')
output_dir = sys.argv[1]
pages = []
for idx, page in tqdm(enumerate(extract_pages_from_dump(sys.stdin))):
pages.append(page)
if len(pages) % processes == 0:
for wiki_page in parallelize_preprocess(
we.extract, pages, processes, progress_bar=(not quiet)
):
if wiki_page:
with open(output_dir + str(wiki_page['id']) +'.json', 'w') as fout:
json.dump(wiki_page, fout)
pages = []
if len(pages) > 0:
for wiki_page in parallelize_preprocess(
we.extract, pages, processes, progress_bar=(not quiet)
):
if wiki_page:
with open(output_dir + str(wiki_page['id']) +'.json', 'w') as fout:
json.dump(wiki_page, fout)