File size: 6,536 Bytes
a5bbcdb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import json
import random
import uuid
import numpy as np
import time
import requests
import traceback
import pdb
import math
import ast
import pandas as pd
import pickle
from qwikidata.linked_data_interface import get_entity_dict_from_api
from qwikidata.sparql import return_sparql_query_results
from urllib3.exceptions import MaxRetryError, ConnectionError
from qwikidata.linked_data_interface import LdiResponseNotOk
import hashlib
class CachedWikidataAPI():
def __init__(self, cache_path = 'entity_cache.p', save_every_x_queries=1):
self.save_every_x_queries = save_every_x_queries
self.x_queries_passed = 0
self.languages = ['en','fr','es','pt','pt-br','it','de']
self.cache_path = cache_path
try:
with open(self.cache_path,'rb') as f:
self.entity_cache = pickle.load(f)
except FileNotFoundError:
self.entity_cache = {}
def get_unique_id_from_str(self, my_str):
return hashlib.md5(str.encode(my_str)).hexdigest()
def save_entity_cache(self, force=False):
if force:
self.x_queries_passed = self.save_every_x_queries
self.x_queries_passed = self.x_queries_passed+1
if self.x_queries_passed >= self.save_every_x_queries:
with open(self.cache_path,'wb') as f:
pickle.dump(self.entity_cache,f)
self.x_queries_passed = 0
def get_entity(self, item_id):
if item_id in self.entity_cache:
return self.entity_cache[item_id]
while True:
try:
entity = get_entity_dict_from_api(item_id)
self.entity_cache[item_id] = entity
self.save_entity_cache()
return entity
except (ConnectionError, MaxRetryError) as e:
#traceback.print_exc()
time.sleep(1)
continue
except LdiResponseNotOk:
#traceback.print_exc()
self.entity_cache[item_id] = 'deleted'
self.save_entity_cache()
return 'deleted'
def get_label(self, item, non_language_set=False):
if type(item) == str:
entity = self.get_entity(item)
if entity == 'deleted':
return (entity, 'none')
labels = entity['labels' if 'labels' in entity else 'lemmas']
elif type(item) == dict:
if 'labels' in item:
labels = item['labels']
elif 'lemmas' in item:
labels = item['lemmas']
for l in self.languages:
if l in labels:
return (labels[l]['value'], l)
if non_language_set:
all_labels = list(labels.keys())
if len(all_labels)>0:
return (labels[all_labels[0]]['value'], all_labels[0])
return ('no-label', 'none')
def get_desc(self, item, non_language_set=False):
if type(item) == str:
entity = self.get_entity(item)
if entity == 'deleted':
return (entity, 'none')
descriptions = entity['descriptions']
elif type(item) == dict:
if 'descriptions' in item:
descriptions = item['descriptions']
for l in self.languages:
if l in descriptions:
return (descriptions[l]['value'], l)
if non_language_set:
all_descriptions = list(descriptions.keys())
if len(all_descriptions)>0:
return (descriptions[all_descriptions[0]]['value'], all_descriptions[0])
return ('no-desc', 'none')
def get_alias(self, item, non_language_set=False):
if type(item) == str:
entity = self.get_entity(item)
if entity == 'deleted':
return ([entity], 'none')
aliases = entity['aliases']
elif type(item) == dict:
if 'aliases' in item:
aliases = item['aliases']
for l in self.languages:
if l in aliases:
return ([alias['value'] for alias in aliases[l]], l)
if non_language_set:
all_aliases = list(aliases.keys())
if len(all_aliases)>0:
return (aliases[all_aliases[0]]['value'], all_aliases[0])
return ([alias['value'] for alias in aliases[all_aliases[0]]], all_aliases[0])
return ('no-alias', 'none')
def get_datatype(self, item):
try:
if type(item) == str:
entity = self.get_entity(item)
if entity == 'deleted':
return entity
datatype = entity['datatype']
elif type(item) == dict:
datatype = item['datatype']
return datatype
except KeyError:
return 'none'
def get_claim_values_of(self, item, property_id):
if type(item) == str:
entity = self.get_entity(item)
if entity == 'deleted':
return entity
claims = entity['claims']
elif type(item) == dict:
claims = item['claims']
if property_id in claims:
instance_of_claims = claims[property_id]
return [i['mainsnak']['datavalue']['value']['id'] for i in instance_of_claims]
else:
return []
def query_sparql_endpoint(self, sparql_query):
sparql_query_id = self.get_unique_id_from_str(sparql_query)
if sparql_query_id in self.entity_cache:
return self.entity_cache[sparql_query_id]
else:
wikidata_sparql_url = 'https://query.wikidata.org/sparql'
try:
while True:
res = requests.get(wikidata_sparql_url, params={"query": sparql_query, "format": "json"})
if res.status_code in (429,504):
time.sleep(1)
continue
elif res.status_code == 200:
res = res.json()
self.entity_cache[sparql_query_id] = res
self.save_entity_cache()
return res
else:
print(res.status_code)
raise Exception
except json.JSONDecodeError as e:
#pdb.set_trace()
print(res, res.__dict__)
raise e
|