Spaces:
Sleeping
Sleeping
from os import makedirs, remove | |
from os.path import exists, dirname | |
from functools import cache | |
import json | |
import streamlit as st | |
from googleapiclient.discovery import build | |
from slugify import slugify | |
from transformers import pipeline | |
import uuid | |
import spacy | |
from spacy.matcher import PhraseMatcher | |
from beautiful_soup.beautiful_soup import get_url_content | |
def google_search_api_request( query ): | |
""" | |
Request Google Search API with query and return results. | |
""" | |
service = build( | |
"customsearch", | |
"v1", | |
developerKey=st.secrets["google_search_api_key"], | |
cache_discovery=False | |
) | |
# Exclude PDFs from search results. | |
query = query + ' -filetype:pdf' | |
return service.cse().list( | |
q=query, | |
cx=st.secrets["google_search_engine_id"], | |
num=5, | |
lr='lang_en', # lang_de | |
fields='items(title,link),searchInformation(totalResults)' | |
).execute() | |
def search_results( query ): | |
""" | |
Request Google Search API with query and return results. Results are cached in files. | |
""" | |
file_path = 'search-results/' + slugify( query ) + '.json' | |
# Create cache directory if it doesn't exist. | |
makedirs(dirname(file_path), exist_ok=True) | |
results = [] | |
# Check if cache file exists. | |
if exists( file_path ): | |
with open( file_path, 'r' ) as results_file: | |
results = json.load( results_file ) | |
else: | |
search_result = google_search_api_request( query ) | |
# Check if search contains results. | |
if int( search_result['searchInformation']['totalResults'] ) > 0: | |
results = search_result['items'] | |
# Save results to cache file. | |
with open( file_path, 'w' ) as results_file: | |
json.dump( results, results_file ) | |
if len( results ) == 0: | |
raise Exception('No results found.') | |
return results | |
def get_summary( url, keywords ): | |
url_id = uuid.uuid5( uuid.NAMESPACE_URL, url ).hex | |
file_path = 'summaries/' + url_id + '.json' | |
# Create cache directory if it doesn't exist. | |
makedirs(dirname(file_path), exist_ok=True) | |
# Check if cache file exists. | |
if exists( file_path ): | |
with open( file_path, 'r' ) as file: | |
summary = json.load( file ) | |
else: | |
try: | |
strings = get_url_content( url ) | |
content_cache = 'content/' + url_id + '.txt' | |
# Create cache directory if it doesn't exist. | |
makedirs(dirname(content_cache), exist_ok=True) | |
# Check if content cache file exists. | |
if exists( content_cache ): | |
with open( content_cache, 'r' ) as file: | |
content = file.read().rstrip() | |
else: | |
content = prep_chunks_summary( strings, keywords ) | |
# Save content to cache file. | |
with open( content_cache, 'w' ) as file: | |
print(content, file=file) | |
max_lenth = 200 | |
# Rudementary method to count number of tokens in a chunk. | |
word_count = len( content.split(' ') ) | |
# If content is longer then 200 words summarize it. | |
if word_count > max_lenth: | |
# Generate summary from compiled content. | |
summary = generate_summary( content, max_lenth ) | |
else: | |
summary = [ { "summary_text": content } ] | |
except Exception as exception: | |
raise exception | |
# Save results to cache file. | |
with open( file_path, 'w' ) as file: | |
json.dump( summary, file ) | |
return summary | |
def generate_summary( content, max_length ): | |
""" | |
Generate summary for content. | |
""" | |
try: | |
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") | |
# https://huggingface.co/docs/transformers/v4.18.0/en/main_classes/pipelines#transformers.SummarizationPipeline | |
summary = summarizer(content, max_length, min_length=30, do_sample=False, truncation=True) | |
except Exception as exception: | |
raise exception | |
return summary | |
def exception_notice( exception ): | |
""" | |
Helper function for exception notices. | |
""" | |
query_params = st.experimental_get_query_params() | |
# If debug mode is enabled, show exception else show warning. | |
if 'debug' in query_params.keys() and query_params['debug'][0] == 'true': | |
st.exception(exception) | |
else: | |
st.warning(str(exception)) | |
# Unused function. | |
def is_keyword_in_string( keywords, string ): | |
""" | |
Checks if string contains keyword. | |
""" | |
for keyword in keywords: | |
if keyword in string: | |
return True | |
return False | |
def filter_sentences_by_keywords( strings, keywords ): | |
""" | |
Filter sentences by keywords using spacy. | |
""" | |
nlp = spacy.load("en_core_web_sm") | |
matcher = PhraseMatcher(nlp.vocab) | |
# Add keywords to matcher. | |
patterns = [nlp(keyword) for keyword in keywords] | |
matcher.add("QueryList", patterns) | |
sentences = [] | |
for string in strings: | |
# Exclude sentences shorten than 5 words. | |
string_length = len( string.split(' ') ) | |
if string_length < 5: | |
continue | |
# Loop through sentences and check if any of the keywords are in the sentence. | |
doc = nlp(string) | |
for sentence in doc.sents: | |
matches = matcher(nlp(sentence.text)) | |
for match_id, start, end in matches: | |
# If keyword is in sentence, add sentence to list. | |
if nlp.vocab.strings[match_id] in ["QueryList"]: | |
sentences.append(sentence.text) | |
if ( len(sentences) == 0 ): | |
raise Exception('No sentences with keywords found.') | |
return sentences | |
def split_content_into_chunks( sentences ): | |
""" | |
Split content into chunks. | |
""" | |
chunk = '' | |
word_count = 0 | |
chunks = [] | |
# Loop through sentences and split into chunks. | |
for sentence in sentences: | |
# Count words in sentence. | |
sentence_word_count = len(sentence.split(' ')) | |
# If the word count plus the current sentence is larger then 512, start a new chunk. | |
if word_count + sentence_word_count > 512: | |
chunks.append(chunk) | |
chunk = '' # Reset chunk. | |
word_count = 0 # Reset word count. | |
# Add sentence to chunk. | |
word_count += sentence_word_count | |
chunk += sentence + ' ' | |
chunks.append(chunk) | |
return chunks | |
def prep_chunks_summary( strings, keywords ): | |
""" | |
Chunk summary. | |
""" | |
try: | |
sentences = filter_sentences_by_keywords( strings, keywords ) | |
chunks = split_content_into_chunks( sentences ) | |
number_of_chunks = len( chunks ) | |
# Loop through chunks if there are more than one. | |
if number_of_chunks > 1: | |
# Calculate the max summary length based on the number of chunks so that the final combined text is not longer than 512 tokens. | |
max_length = int( 512 / number_of_chunks ) | |
content = '' | |
# Loop through chunks and generate summary. | |
for chunk in chunks: | |
# Rudementary method to count number of tokens in a chunk. | |
chunk_length = len( chunk.split(' ') ) | |
# If chunk is shorter than max length, divide chunk length by 2. | |
if chunk_length < max_length: | |
max_length = int( chunk_length / 2 ) | |
# Generate summary for chunk. | |
chunk_summary = generate_summary( chunk, max_length ) | |
for summary in chunk_summary: | |
content += summary['summary_text'] + ' ' | |
else: | |
content = chunks[0] | |
return content | |
except Exception as exception: | |
raise exception | |
def main(): | |
st.title('Racoon Search') | |
query = st.text_input('Search query') | |
query_params = st.experimental_get_query_params() | |
if query : | |
with st.spinner('Loading search results...'): | |
try: | |
results = search_results( query ) | |
except Exception as exception: | |
exception_notice(exception) | |
return | |
# Count results. | |
number_of_results = len( results ) | |
st.success( 'Found {} results for "{}".'.format( number_of_results, query ) ) | |
# If debug mode is enabled, show search results in JSON. | |
if 'debug' in query_params.keys() and query_params['debug'][0] == 'true': | |
with st.expander("Search results JSON"): | |
if st.button('Delete search result cache', key=query + 'cache'): | |
remove( 'search-results/' + slugify( query ) + '.json' ) | |
st.json( results ) | |
progress_bar = st.progress(0) | |
st.header('Search results') | |
st.markdown('---') | |
# for result in results: | |
for index, result in enumerate(results): | |
with st.container(): | |
st.markdown('### ' + result['title']) | |
# Create a unique id for the result. | |
url_id = uuid.uuid5( uuid.NAMESPACE_URL, result['link'] ).hex | |
# List of query keywords. | |
keywords = query.split(' ') | |
try : | |
# Create summary of summarized content. | |
summary = get_summary( result['link'], keywords ) | |
st.markdown(summary[0]['summary_text']) | |
except Exception as exception: | |
exception_notice(exception) | |
progress_bar.progress( ( index + 1 ) / number_of_results ) | |
# Show links and buttons. | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.markdown('[Website Link]({})'.format(result['link'])) | |
with col2: | |
if st.button('Delete content from cache', key=url_id + 'content'): | |
remove( 'page-content/' + url_id + '.txt' ) | |
with col3: | |
if st.button('Delete summary from cache', key=url_id + 'summary'): | |
remove( 'summaries/' + url_id + '.json' ) | |
st.markdown('---') | |
if __name__ == '__main__': | |
main() | |