Spaces:
Sleeping
Sleeping
import pandas as pd | |
import json | |
import numpy as np | |
import re | |
from itertools import combinations as itertools_combinations | |
import os | |
import sys | |
from SPARQLWrapper import SPARQLWrapper, JSON | |
from sentence_transformers import SentenceTransformer | |
import aiohttp | |
import asyncio | |
import streamlit as st | |
import time | |
from openai import OpenAI | |
import sys | |
from googlesearch import search | |
folder_path = '/home/user/app/qids_folder' | |
if not os.path.exists(folder_path): | |
os.mkdir(folder_path) | |
print(f"folder created at {folder_path}") | |
else: | |
print(f"folder already exists.") | |
folder_path_1 = '/home/user/app/info_extraction' | |
if not os.path.exists(folder_path_1): | |
os.mkdir(folder_path_1) | |
print(f"Folder created at {folder_path_1}") | |
else: | |
print(f"folder already exists.") | |
model = SentenceTransformer("Lajavaness/bilingual-embedding-large", trust_remote_code=True) | |
api_token = st.text_input("Enter your API key from [GitHub](https://github.com/marketplace/models/azure-openai/gpt-4o):", "", type="password") | |
if api_token: | |
endpoint = "https://models.inference.ai.azure.com" | |
model_name = "gpt-4o" | |
client = OpenAI( | |
base_url=endpoint, | |
api_key=api_token, | |
) | |
st.success("API Token is set for this session.") | |
else: | |
st.warning("Please enter an API token to proceed.") | |
async def fetch_json(url, session): | |
async with session.get(url) as response: | |
return await response.json() | |
async def combination_method(name, session): | |
async with aiohttp.ClientSession() as session: | |
data = set() | |
new_name = name.split() | |
x = itertools_combinations(new_name, 2) | |
for i in x: | |
new_word = (i[0] + " " + i[1]) | |
url = f"{new_word} site:en.wikipedia.org inurl:/wiki/ -inurl:? -inurl:Category: -inurl:Help: -inurl:Special: -inurl:File:" | |
s = search(url, num_results = 12, lang="en") | |
for i in s: | |
data.add(i.split("/")[-1]) | |
return data | |
async def single_method(name, session): | |
async with aiohttp.ClientSession() as session: | |
data = set() | |
new_name = name.replace("-", " ").replace("/", " ").split() | |
for i in new_name: | |
url = f"{i} site:en.wikipedia.org inurl:/wiki/ -inurl:? -inurl:Category: -inurl:Help: -inurl:Special: -inurl:File:" | |
s = search(url, num_results = 12, lang="en") | |
for i in s: | |
data.add(i.split("/")[-1]) | |
return data | |
async def mains(name, single, combi): | |
data = set() | |
disam_data = set() | |
qids = set() | |
async with aiohttp.ClientSession() as session: | |
url = f"{name} site:en.wikipedia.org inurl:/wiki/ -inurl:? -inurl:Category: -inurl:Help: -inurl:Special: -inurl:File:" | |
s = search(url, num_results = 30, lang="en") | |
for i in s: | |
data.add(i.split("/")[-1]) | |
print(i) | |
wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=1&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" | |
json_data = await fetch_json(wikipedia_url, session) | |
suggestion = json_data.get('query', {}).get('searchinfo', {}).get('suggestion') | |
if suggestion: | |
suggested_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={suggestion}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" | |
json_suggestion = await fetch_json(suggested_url, session) | |
results = json_suggestion.get('query', {}).get('search') | |
for i in results: | |
data.add(i.get('title')) | |
# Handle disambiguation links | |
if data != {0}: | |
for ids in data: | |
titles = set() | |
wikipedia_disambiguation = f"https://en.wikipedia.org/w/api.php?action=query&generator=links&format=json&redirects=1&pageids={ids}&prop=pageprops&gpllimit=50&ppprop=wikibase_item" | |
json_id = await fetch_json(wikipedia_disambiguation, session) | |
try: | |
title = json_id.get('query').get('pages') | |
for k, v in title.items(): | |
titles.add(v.get("title")) | |
except: | |
pass | |
if "Help:Disambiguation" in titles: | |
for i in titles: | |
if ":" not in i: | |
disam_data.add(i) | |
else: | |
disam_data.add(ids) | |
# Makes combinations of the name | |
if combi == "Yes": | |
if len(name.replace("-", " ").split()) >= 3: | |
combination_names = await combination_method(name, session) | |
for i in combination_names: | |
disam_data.add(i) | |
# Checks every word alone | |
if single == "Yes": | |
if len(name.replace("-", " ").replace("/", " ").split()) >= 2: | |
singles = await single_method(name, session) | |
for i in singles: | |
disam_data.add(i) | |
for ids in disam_data: | |
try: | |
wikibase_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={ids}&prop=pageprops&format=json" | |
json_qid = await fetch_json(wikibase_url, session) | |
wikidata_qid = json_qid.get('query', {}).get('pages', {}) | |
for page_id, page_data in wikidata_qid.items(): | |
page_props = page_data.get('pageprops', {}) | |
wikibase_item = page_props.get('wikibase_item', None) | |
if wikibase_item: | |
qids.add(wikibase_item) | |
except: | |
pass | |
with open(f"/home/user/app/qids_folder/{name}.json", "w") as f: | |
json.dump(list(qids), f) | |
async def get_results(query): | |
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
url = "https://query.wikidata.org/sparql" | |
sparql = SPARQLWrapper(url, agent=user_agent) | |
sparql.setQuery(query) | |
sparql.setReturnFormat(JSON) | |
return sparql.query().convert() | |
def get_resultss(query): | |
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
url = "https://query.wikidata.org/sparql" | |
sparql = SPARQLWrapper(url, agent=user_agent) | |
sparql.setQuery(query) | |
sparql.setReturnFormat(JSON) | |
return sparql.query().convert() | |
def cleaner(text): | |
text = text.replace('\\', '').replace('\n', ' ') | |
text = re.sub(r'\{.*?\}', '', text) | |
text = re.sub(' +', ' ', text).strip() | |
return text | |
async def retriever(qid): | |
async with aiohttp.ClientSession() as session: | |
list_with_sent = [] | |
query_label = f"""SELECT ?subjectLabel | |
WHERE {{ | |
wd:{qid} rdfs:label ?subjectLabel . | |
FILTER(LANG(?subjectLabel) = "en") | |
}} | |
""" | |
results = await get_results(query_label) | |
label = None | |
if results["results"]["bindings"]: | |
for result in results["results"]["bindings"]: | |
for key, value in result.items(): | |
label = value.get("value", {}).lower() | |
query_alias = f"""SELECT ?alias | |
WHERE {{ | |
wd:{qid} skos:altLabel ?alias | |
FILTER(LANG(?alias) = "en") | |
}} | |
""" | |
alias_list = [] | |
results = await get_results(query_alias) | |
for result in results["results"]["bindings"]: | |
for key, value in result.items(): | |
alias = value.get("value", "None") | |
alias_list.append(alias) | |
query_desci = f"""SELECT ?subjectLabel | |
WHERE {{ | |
?subjectLabel schema:about wd:{qid} ; | |
schema:inLanguage "en" ; | |
schema:isPartOf <https://en.wikipedia.org/> . | |
}} | |
""" | |
results = await get_results(query_desci) | |
cleaned_first_para = "None" | |
if results["results"]["bindings"]: | |
for result in results["results"]["bindings"]: | |
for key, value in result.items(): | |
desc = value.get("value", "None") | |
title = desc.split("/wiki/")[1] | |
url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles={title}&exintro=&exsentences=2&explaintext=&redirects=&formatversion=2&format=json" | |
json_data = await fetch_json(url, session) | |
cleaned_first_para = cleaner(json_data.get('query', {}).get('pages', [{}])[0].get('extract', 'None')) | |
else: | |
query_desc = f"""SELECT ?subjectLabel | |
WHERE {{ | |
wd:{qid} schema:description ?subjectLabel . | |
FILTER(LANG(?subjectLabel) = "en") | |
}} | |
""" | |
results = await get_results(query_desc) | |
if results["results"]["bindings"]: | |
for result in results["results"]["bindings"]: | |
for key, value in result.items(): | |
cleaned_first_para = value.get("value", "None") | |
list_with_sent.append({"qid": qid, "label": label, "description": cleaned_first_para}) | |
if alias_list: | |
for alias in alias_list: | |
list_with_sent.append({"qid": qid, "label": alias.lower(), "description": cleaned_first_para}) | |
return list_with_sent | |
async def main(name): | |
with open(f"/home/user/app/qids_folder/{name}.json", "r") as f: | |
final_list = [] | |
qids = json.load(f) | |
for q in qids: | |
returned_list = await retriever(q) | |
if returned_list: | |
final_list.extend(returned_list) | |
with open(f"/home/user/app/info_extraction/{name}.json", "w", encoding="utf-8") as flast: | |
json.dump(final_list, flast) | |
#def check_sentence(sentence): | |
# two_consecutive_uppercase = r"[A-Z]{2}" | |
# uppercase_followed_by_fullstop = r"[A-Z]\." | |
# if re.search(two_consecutive_uppercase, sentence): | |
# return True | |
# if re.search(uppercase_followed_by_fullstop, sentence): | |
# return True | |
# return False | |
def main_cli(): | |
st.title("✨ Entity Linking Application ✨") | |
st.caption("This web application is part of my master’s dissertation.") | |
input_sentence_user = st.text_input("Enter a sentence:", "") | |
input_mention_user = st.text_input("Enter a textural reference (mention) that is inside the sentence:", "") | |
single = st.selectbox("Search each word individually? (Useful for difficult mentions)", ['Yes', 'No'], index=1) | |
combi = st.selectbox("Make combinations of each word? (Useful for difficult mentions)", ['Yes', 'No'], index=1) | |
disambi = st.selectbox("Run acronym disambiguation? (Enable it if the mention is nested)", ['Yes', 'No'], index=0) | |
if st.button("Run Entity Linking"): | |
if input_sentence_user and input_mention_user: | |
# check if the mention is in the sentence | |
if input_mention_user in input_sentence_user: | |
with st.spinner("Applying Data Normalization module... (1/5)"): | |
# Data Normalization | |
start_time = time.time() | |
list_with_full_names = [] | |
list_with_names_to_show = [] | |
if disambi == "Yes": | |
response = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "system", | |
"content": """ | |
I will give you one or more labels within a sentence. Your task is as follows: | |
Identify each label in the sentence, and check if it is an acronym. | |
If the label is an acronym, respond with the full name of the acronym. | |
If the label is not an acronym, respond with the label exactly as it was given to you. | |
If a label contains multiple terms (e.g., 'phase and DIC microscopy'), treat each term within the label as a separate label. | |
This means you should identify and explain each part of the label individually. | |
Each part should be on its own line in the response. | |
Context-Specific Terms: If the sentence context suggests a relevant term that applies to each label (such as "study" in 'morphological, sedimentological, and stratigraphical study'), add that term to each label’s explanation. | |
Use context clues to determine the appropriate term to add (e.g., 'study' or 'microscopy'). | |
Output Format: Your response should contain only the explanations, formatted as follows: | |
Each label or part of a label should be on a new line. | |
Do not include any additional text, and do not repeat the original sentence. | |
Example 1: | |
Input: | |
label: phase and DIC microscopy | |
context: Tardigrades have been extracted from samples using centrifugation with Ludox AM™ and mounted on individual microscope slides in Hoyer's medium for identification under phase and DIC microscopy. | |
Expected response: | |
phase: phase microscopy | |
DIC microscopy: Differential interference contrast microscopy | |
Example 2: | |
Input: | |
label: morphological, sedimentological, and stratigraphical study | |
context: This paper presents results of a morphological, sedimentological, and stratigraphical study of relict beach ridges formed on a prograded coastal barrier in Bream Bay, North Island New Zealand. | |
Expected response: | |
morphological: morphological study | |
sedimentological: sedimentological study | |
stratigraphical: stratigraphical study | |
IMPORTANT: | |
Each label, even if nested within another, should be treated as an individual item. | |
Each individual label or acronym should be output on a separate line. | |
""" | |
}, | |
{ | |
"role": "user", | |
"content": f"label:{input_mention_user}, context:{input_sentence_user}" | |
} | |
], | |
temperature=1.0, | |
top_p=1.0, | |
max_tokens=1000, | |
model=model_name | |
) | |
kati = response.choices[0].message.content.splitlines() | |
print(response.choices[0].message.content) | |
for i in kati: | |
context = i.split(":")[-1].strip() | |
original_name = i.split(":")[0].strip() | |
list_with_full_names.append(context) | |
list_with_names_to_show.append(original_name) | |
name = ",".join(list_with_full_names) | |
else: | |
name = input_mention_user | |
list_with_full_names.append(name) | |
list_with_names_to_show.append(name) | |
input_sentence_user = input_sentence_user.replace(input_mention_user, name) # Changing the mention to the correct one | |
response = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "system", | |
"content": "Given a label or labels within a sentence, provide a brief description (2-3 sentences) explaining what the label represents, similar to how a Wikipedia entry would. Format your response as follows: label: description. I want only the description of the label, not the role in the context. Include the label in the description as well. For example: Sentiment analysis: Sentiment analysis is the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.\nText analysis: Text mining, text data mining (TDM) or text analytics is the process of deriving high-quality information from text. It involves the discovery by computer of new, previously unknown information, by automatically extracting information from different written resources.", | |
}, | |
{ | |
"role": "user", | |
"content": f"label:{name}, context:{input_sentence_user}" | |
} | |
], | |
temperature=1.0, | |
top_p=1.0, | |
max_tokens=1000, | |
model=model_name | |
) | |
z = response.choices[0].message.content.splitlines() | |
print(response.choices[0].message.content) | |
list_with_contexts = [] | |
for i in z: | |
context = i.split(":")[-1].strip() | |
list_with_contexts.append(context) | |
st.write("✅ Applied Data Normilzation module (1/5)") | |
# Candidate Retrieval & Information Gathering | |
async def big_main(mention, single, combi): | |
mention = mention.split(",") | |
with st.spinner("Applying Candidate Retrieval module... (2/5)"): | |
for i in mention: | |
await mains(i, single, combi) | |
st.write("✅ Applied Candidate Retrieval module (2/5)") | |
with st.spinner("Applying Information Gathering module... (3/5)"): | |
for i in mention: | |
await main(i) | |
st.write("✅ Applied Information Gathering module (3/5)") | |
asyncio.run(big_main(name, single, combi)) | |
number = 0 | |
for i,j,o in zip(list_with_full_names,list_with_contexts,list_with_names_to_show): | |
number += 1 | |
with st.spinner(f"Applying Candidate Selection module... (4/5) [{number}/{len(list_with_full_names)}]"): | |
with open(f"/home/user/app/info_extraction/{i}.json", "r") as f: | |
json_file = json.load(f) | |
lista = [] | |
lista_1 = [] | |
for element in json_file: | |
qid = element.get("qid") | |
link = f"https://www.wikidata.org/wiki/{qid}" | |
label = element.get("label") | |
description = element.get("description") | |
label_emb = model.encode([label]) | |
desc_emb = model.encode([description]) | |
lista.append({link: [label_emb, desc_emb]}) | |
label_dataset_emb = model.encode([i]) | |
desc_dataset_emb = model.encode([j]) | |
for emb in lista: | |
for k, v in emb.items(): | |
cossim_label = model.similarity(label_dataset_emb, v[0][0]) | |
desc_label = model.similarity(desc_dataset_emb, v[1][0]) | |
emb_mean = np.mean([cossim_label, desc_label]) | |
lista_1.append({k: emb_mean}) | |
print(k) | |
sorted_data = sorted(lista_1, key=lambda x: list(x.values())[0], reverse=True) | |
st.write(f"✅ Applined Candidate Selection module (4/5) [{number}/{len(list_with_full_names)}]") | |
with st.spinner(f"Applying Candidate Matching module... (5/5) [{number}/{len(list_with_full_names)}]"): | |
if sorted_data: | |
sorted_top = sorted_data[0] | |
for k, v in sorted_top.items(): | |
qid = k.split("/")[-1] | |
wikidata2wikipedia = f""" | |
SELECT ?wikipedia | |
WHERE {{ | |
?wikipedia schema:about wd:{qid} . | |
?wikipedia schema:isPartOf <https://en.wikipedia.org/> . | |
}} | |
""" | |
results = get_resultss(wikidata2wikipedia) | |
for result in results["results"]["bindings"]: | |
for key, value in result.items(): | |
wikipedia = value.get("value", "None") | |
sparql = SPARQLWrapper("http://dbpedia.org/sparql") | |
wikidata2dbpedia = f""" | |
SELECT ?dbpedia | |
WHERE {{ | |
?dbpedia owl:sameAs <http://www.wikidata.org/entity/{qid}>. | |
}} | |
""" | |
sparql.setQuery(wikidata2dbpedia) | |
sparql.setReturnFormat(JSON) | |
results = sparql.query().convert() | |
for result in results["results"]["bindings"]: | |
dbpedia = result["dbpedia"]["value"] | |
st.write(f"✅ Applied Candidate Matching module (5/5) [{number}/{len(list_with_full_names)}]") | |
st.text(f"The correct entity for '{o}' is:") | |
st.success(f"Wikipedia: {wikipedia}") | |
st.success(f"Wikidata: {k}") | |
st.success(f"DBpedia: {dbpedia}") | |
else: | |
st.warning(f"The entity: {o} is NIL.") | |
else: | |
st.warning(f"The mention '{input_mention_user}' was NOT found in the sentence.") | |
else: | |
st.warning("Please fill in both fields.") | |
end_time = time.time() | |
execution_time = end_time - start_time | |
ETA = time.strftime("%H:%M:%S", time.gmtime(execution_time)) | |
st.write(f"⌛ Execution time: {ETA}") | |
# i think this part can be removed now | |
folder_path = "qids_folder" | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
os.remove(file_path) | |
folder_path_1 = "info_extraction" | |
for filename in os.listdir(folder_path_1): | |
file_path = os.path.join(folder_path_1, filename) | |
os.remove(file_path) | |
if __name__ == "__main__": | |
main_cli() |