-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
80 lines (60 loc) · 2.11 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Utility funtions."""
import numpy
import progressbar
from chainer import cuda
from chainer.dataset import convert
# speical symbols
PAD = -1
UNK = 0
EOS = 1
def get_subsequence_before_eos(seq, eos=EOS):
index = numpy.argwhere(seq.data == EOS)
return seq[:index[0, 0] + 1] if len(index) > 0 else seq
def seq2seq_pad_concat_convert(xy_batch, device):
"""
Args:
xy_batch: List of tuple of source and target sentences
device: Device ID to which an array is sent.
Returns:
Tuple of Converted array.
"""
x_seqs, y_seqs = zip(*xy_batch)
x_block = convert.concat_examples(x_seqs, device, padding=-1)
y_block = convert.concat_examples(y_seqs, device, padding=-1)
xp = cuda.get_array_module(x_block)
x_block = xp.pad(x_block, ((0, 0), (0, 1)),
'constant', constant_values=PAD)
for i_batch, seq in enumerate(x_seqs):
x_block[i_batch, len(seq)] = EOS
y_out_block = xp.pad(y_block, ((0, 0), (0, 1)),
'constant', constant_values=PAD)
for i_batch, seq in enumerate(y_seqs):
y_out_block[i_batch, len(seq)] = EOS
return (x_block, y_out_block)
def count_lines(path):
with open(path) as f:
return sum([1 for _ in f])
def load_vocabulary(path):
with open(path) as f:
# +2 for UNK and EOS
word_ids = {line.strip(): i + 2 for i, line in enumerate(f)}
word_ids['<UNK>'] = UNK
word_ids['<EOS>'] = EOS
return word_ids
def load_data(vocabulary, path, debug=False):
n_lines = min(10000, count_lines(path)) if debug else count_lines(path)
bar = progressbar.ProgressBar()
data = []
print('loading...: %s' % path)
with open(path) as f:
for line in bar(f, max_value=n_lines):
words = line.strip().split()
array = numpy.array([vocabulary.get(w, UNK) for w in words], 'i')
data.append(array)
if len(data) == n_lines:
break
return data
def calculate_unknown_ratio(data):
unknown = sum((s == UNK).sum() for s in data)
total = sum(s.size for s in data)
return unknown / total