-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_parse.py
executable file
·251 lines (210 loc) · 9.19 KB
/
test_parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3
import argparse
import codecs
import datetime
import encodings
import json
import os
import pkgutil
import sys
import time
import wizard_domaininfo
"""Tests for the `cli` module."""
# FIXME: The testing script is currently incapable of testing referenced NIC handles that are
# retrieved separately, such as is the case with the JPRS registry for .co.jp. This
# really needs to be fixed, to ensure that contact parsing for this doesn't break.
def get_codecs():
"""Dynamically get list of codecs in python."""
false_positives = set(["aliases"])
found = set(name for imp, name, ispkg in pkgutil.iter_modules(encodings.__path__) if not ispkg)
found.difference_update(false_positives)
return found
def read_encoded_file(file_path):
"""Try reading file using all codecs. Return the first successful one."""
for encoding in get_codecs():
try:
with codecs.open(file_path, "r", encoding) as f:
return f.read()
except Exception:
pass
parser = argparse.ArgumentParser(description="Runs or modifies the test suite for python-whois.")
parser.add_argument("mode", nargs=1, choices=["run", "update"], default="run",
help="Whether to run or update the tests. Only update if you know what you're doing!")
parser.add_argument("target", nargs="+",
help="The targets to run/modify tests for. Use 'all' to run the full test suite.")
args = parser.parse_args()
OK = '\033[92m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def encoded_json_dumps(obj):
try:
return json.dumps(obj, default=json_fallback)
except UnicodeDecodeError as e:
return json.dumps(recursive_encode(obj, "latin-1"), default=json_fallback)
def json_fallback(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
else:
return obj
def recursive_encode(obj, encoding):
for key in list(obj.keys()):
if isinstance(obj[key], dict):
obj[key] = recursive_encode(obj[key], encoding)
elif isinstance(obj[key], list):
obj[key] = [x.decode(encoding) for x in obj[key]]
else:
try:
obj[key] = obj[key].decode(encoding)
except:
pass
return obj
def recursive_compare(obj1, obj2, chain=[]):
errors = []
chain_name = " -> ".join(chain)
s1 = set(obj1.keys())
s2 = set(obj2.keys())
for item in s1.difference(s2):
errors.append("(%s) Key present in previous data, but missing in current data: %s" % (chain_name, item))
for item in s2.difference(s1):
errors.append("(%s) New key present in current data, but missing in previous data: %s" % (chain_name, item))
for key in s1.intersection(s2):
if isinstance(obj1[key], dict) and isinstance(obj2[key], dict):
errors += recursive_compare(obj1[key], obj2[key], chain + [key])
elif isinstance(obj1[key], list) and isinstance(obj2[key], list):
lst1 = [json_fallback(x) for x in obj1[key]]
lst2 = [json_fallback(x) for x in obj2[key]]
if set(lst1) != set(lst2):
errors.append(
"(%s) List mismatch in key %s.\n [old] %s\n [new] %s" % (chain_name, key, set(lst1), set(lst2)))
else:
if json_fallback(obj1[key]) != json_fallback(obj2[key]):
errors.append("(%s) Data mismatch in key %s.\n [old] %s\n [new] %s" % (
chain_name, key, json_fallback(obj1[key]), json_fallback(obj2[key])))
return errors
if "all" in args.target:
targets = os.listdir("tests/data")
else:
targets = args.target
targets.sort()
if args.mode[0] == "run":
times_default = []
times_normalized = []
errors = False
suites = []
for target in targets:
try:
with codecs.open(os.path.join("tests/data", target), "r") as f:
data = f.read().split("\n--\n")
except IOError as e:
sys.stderr.write(
"Invalid domain %(domain)s specified. No test case or base data exists.\n" % {"domain": target})
errors = True
continue
except UnicodeDecodeError:
try:
# Try cp1252 (ufpa.br uses that)
with codecs.open(os.path.join("tests/data", target), "r", 'cp1252') as f:
data = f.read().split("\n--\n")
except UnicodeDecodeError as e:
# Fall back to trying all registered codecs
data = read_encoded_file(os.path.join("tests/data", target)).split("\n--\n")
try:
with codecs.open(os.path.join("tests/target_default", target), "r") as f:
default = f.read()
with codecs.open(os.path.join("tests/target_normalized", target), "r") as f:
normalized = f.read()
except IOError as e:
sys.stderr.write(
"Missing target data for domain %(domain)s. Run `./test_parse.py update %(domain)s` to correct this, "
"after verifying that wizard_domaininfo can correctly parse this particular domain.\n" % {
"domain": target})
errors = True
continue
suites.append((target, data, default, normalized))
if errors:
exit(1)
total_errors = 0
total_failed = 0
total_passed = 0
done = 1
total = len(suites) * 2
for target, data, target_default, target_normalized in suites:
for normalization in (True, []):
start_time = time.time()
parsed = wizard_domaininfo.parse.parse_raw_whois(data, normalized=normalization)
time_taken = (time.time() - start_time) * 1000 # in ms
parsed = json.loads(encoded_json_dumps(parsed)) # Stupid Unicode hack
if normalization == True:
target_data = json.loads(target_normalized)
else:
target_data = json.loads(target_default)
errors = recursive_compare(target_data, parsed, chain=["root"])
if normalization == True:
mode = "normalized"
else:
mode = "default"
progress_prefix = "[%s/%s] " % (str(done).rjust(len(str(total))), str(total).rjust(len(str(total))))
if len(errors) == 0:
sys.stdout.write(OK)
sys.stdout.write(progress_prefix + "%s passed in %s mode.\n" % (target, mode))
sys.stderr.write(ENDC)
if normalization == True:
times_normalized.append(time_taken)
else:
times_default.append(time_taken)
total_passed += 1
else:
sys.stderr.write(FAIL)
sys.stderr.write(progress_prefix + "%s TEST CASE FAILED, ERRORS BELOW\n" % target)
sys.stderr.write("Mode: %s\n" % mode)
sys.stderr.write("=======================================\n")
for error in errors:
sys.stderr.write(error + "\n")
sys.stderr.write("=======================================\n")
sys.stderr.write(ENDC)
total_errors += len(errors)
total_failed += 1
done += 1
if len(times_default) > 0:
average_default = int(sum(times_default) / float(len(times_default)))
min_default = min(times_default)
max_default = max(times_default)
sys.stdout.write(
"Timing in default mode: %dms avg, %dms min, %dms max\n" % (average_default, min_default, max_default))
if len(times_normalized) > 0:
average_normalized = int(sum(times_normalized) / float(len(times_normalized)))
min_normalized = min(times_normalized)
max_normalized = max(times_normalized)
sys.stdout.write("Timing in normalized mode: %dms avg, %dms min, %dms max\n" % (
average_normalized, min_normalized, max_normalized))
if total_failed == 0:
sys.stdout.write(OK)
sys.stdout.write("All tests passed!\n")
sys.stderr.write(ENDC)
else:
sys.stdout.write(FAIL)
sys.stdout.write("%d tests failed, %d errors in total.\n" % (total_failed, total_errors))
sys.stderr.write(ENDC)
exit(1)
elif args.mode[0] == "update":
errors = False
updates = []
for target in targets:
try:
with codecs.open(os.path.join("tests/data", target), "r") as f:
data = f.read().split("\n--\n")
updates.append((target, data))
except IOError as e:
sys.stderr.write("Invalid domain %(domain)s specified. No base data exists.\n" % {"domain": target})
errors = True
continue
if errors:
exit(1)
for target, data in updates:
default = wizard_domaininfo.parse.parse_raw_whois(data)
normalized = wizard_domaininfo.parse.parse_raw_whois(data, normalized=True)
with codecs.open(os.path.join("tests/target_default", target), "w") as f:
f.write(encoded_json_dumps(default))
with codecs.open(os.path.join("tests/target_normalized", target), "w") as f:
f.write(encoded_json_dumps(normalized))
print("Generated target data for %s." % target)