-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwordhoard_helpers.py
109 lines (88 loc) · 3.73 KB
/
wordhoard_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import sys
import requests
from constants import ID_ORI_URL, wordhoard_topics_url, wordhoard_list_url
from oauth_helpers import post_with_client
def get_linkeddata(orid):
response = requests.get(url=f'{ID_ORI_URL}{orid}.jsonld')
if not response.ok:
raise IndexError(f'item orid:{orid} did not resolve')
return response.json()
def find_super_items(ori_doc_id):
# doc_id is een schema:MediaObject
# heeft een linkje dc:isReferencedBy
# linkje wijst naar meeting:AgendaItem of :Meeting
# :AgendaItem heeft een schema:superEvent, een :Meeting
# :Meeting heeft een meeting:committee (de org die we willen hebben.)
try:
doc_json = get_linkeddata(ori_doc_id) # schema:MediaObject
parent_orid = int(doc_json['dc:isReferencedBy']["@id"].split(":")[1])
except (KeyError, IndexError):
return None, None, None
try:
parent_data = get_linkeddata(parent_orid) # meeting:AgendaItem or :Meeting
except IndexError:
return parent_orid, None, None
committee_orid_str = parent_data.get("meeting:committee", {}).get("@id", ":").split(":")[1]
committee_orid = int(committee_orid_str) if committee_orid_str else None
try:
grandparent_orid = int(parent_data['schema:superEvent']["@id"].split(":")[1])
except KeyError:
return parent_orid, None, committee_orid
try:
meeting_item_json = get_linkeddata(grandparent_orid) # meeting:Meeting
committee_orid = int(meeting_item_json["meeting:committee"]["@id"].split(":")[1])
except (KeyError, IndexError):
return parent_orid, grandparent_orid, None
else:
return parent_orid, grandparent_orid, committee_orid
def empty_wordhoard_payload(orid, rdf_type, wh_type):
return {
'topics': {},
'description': f"Mined {wh_type} for {orid} of type {rdf_type}",
'name': f"{orid}_{wh_type}",
'detection_settings': {
'abbreviation': {
'boundary_check': r'\b{}\b'
},
'canonical_name': {
'ignore_case': True
},
'names': {
'ignore_case': True
},
}
}
def post_wordhoard_payload(item_id, rdf_type, topics, wordhoard_id=None, wh_type='definitions'):
orid = f"orid:{item_id}"
wordhoard_payload = empty_wordhoard_payload(orid, rdf_type, wh_type)
for topic in topics:
wordhoard_payload['topics'][topic['id']] = topic.get('local_name')
if wordhoard_id:
response = post_with_client(
wordhoard_topics_url % wordhoard_id,
data=wordhoard_payload['topics']
)
else:
response = post_with_client(wordhoard_list_url, data=wordhoard_payload)
if response.ok:
return response.status_code, None
else:
print(response.status_code, response.text, file=sys.stderr)
if wordhoard_id:
print(wordhoard_payload, file=sys.stderr)
print("Wordhoard update POST error...", file=sys.stderr)
else:
print("Wordhoard create POST error...", file=sys.stderr)
return response.status_code, response.text
"""
Copyright 2019 Hendrik Grondijs, Alex Olieman <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""