Spaces:
Sleeping
Sleeping
from fastapi import FastAPI | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.middleware.cors import CORSMiddleware | |
# Define the FastAPI app | |
app = FastAPI(docs_url="/") | |
# Add the CORS middleware to the app | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def search( | |
query: str, | |
classification: bool = True, | |
summarization: bool = True, | |
similarity: bool = False, | |
add_chatgpt_results: bool = False, | |
n_results: int = 10, | |
): | |
import time | |
import requests | |
start_time = time.time() | |
# Initialize the lists to store the results | |
titles = [] | |
authors = [] | |
publishers = [] | |
descriptions = [] | |
images = [] | |
def gbooks_search(query, n_results=30): | |
""" | |
Access the Google Books API and return the results. | |
""" | |
# Set the API endpoint and query parameters | |
url = "https://www.googleapis.com/books/v1/volumes" | |
params = {"q": str(query), "printType": "books", "maxResults": n_results} | |
# Send a GET request to the API with the specified parameters | |
response = requests.get(url, params=params) | |
# Parse the response JSON and append the results | |
data = response.json() | |
# Initialize the lists to store the results | |
titles = [] | |
authors = [] | |
publishers = [] | |
descriptions = [] | |
images = [] | |
for item in data["items"]: | |
volume_info = item["volumeInfo"] | |
try: | |
titles.append(f"{volume_info['title']}: {volume_info['subtitle']}") | |
except KeyError: | |
titles.append(volume_info["title"]) | |
try: | |
descriptions.append(volume_info["description"]) | |
except KeyError: | |
descriptions.append("Null") | |
try: | |
publishers.append(volume_info["publisher"]) | |
except KeyError: | |
publishers.append("Null") | |
try: | |
authors.append(volume_info["authors"][0]) | |
except KeyError: | |
authors.append("Null") | |
try: | |
images.append(volume_info["imageLinks"]["thumbnail"]) | |
except KeyError: | |
images.append( | |
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" | |
) | |
return titles, authors, publishers, descriptions, images | |
# Run the gbooks_search function | |
( | |
titles_placeholder, | |
authors_placeholder, | |
publishers_placeholder, | |
descriptions_placeholder, | |
images_placeholder, | |
) = gbooks_search(query, n_results=n_results) | |
# Append the results to the lists | |
[titles.append(title) for title in titles_placeholder] | |
[authors.append(author) for author in authors_placeholder] | |
[publishers.append(publisher) for publisher in publishers_placeholder] | |
[descriptions.append(description) for description in descriptions_placeholder] | |
[images.append(image) for image in images_placeholder] | |
# Get the time since the start | |
first_checkpoint = time.time() | |
first_checkpoint_time = int(first_checkpoint - start_time) | |
def openalex_search(query, n_results=10): | |
""" | |
Run a search on OpenAlex and return the results. | |
""" | |
import pyalex | |
from pyalex import Works | |
# Add email to the config | |
pyalex.config.email = "[email protected]" | |
# Define a pager object with the same query | |
pager = Works().search(str(query)).paginate(per_page=n_results, n_max=n_results) | |
# Generate a list of the results | |
openalex_results = list(pager) | |
# Initialize the lists to store the results | |
titles = [] | |
authors = [] | |
publishers = [] | |
descriptions = [] | |
images = [] | |
# Get the titles, descriptions, and publishers and append them to the lists | |
for result in openalex_results[0]: | |
try: | |
titles.append(result["title"]) | |
except KeyError: | |
titles.append("Null") | |
try: | |
descriptions.append(result["abstract"]) | |
except KeyError: | |
descriptions.append("Null") | |
try: | |
publishers.append(result["host_venue"]["publisher"]) | |
except KeyError: | |
publishers.append("Null") | |
try: | |
authors.append(result["authorships"][0]["author"]["display_name"]) | |
except KeyError: | |
authors.append("Null") | |
images.append( | |
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" | |
) | |
return titles, authors, publishers, descriptions, images | |
# Run the openalex_search function | |
( | |
titles_placeholder, | |
authors_placeholder, | |
publishers_placeholder, | |
descriptions_placeholder, | |
images_placeholder, | |
) = openalex_search(query, n_results=n_results) | |
# Append the results to the lists | |
[titles.append(title) for title in titles_placeholder] | |
[authors.append(author) for author in authors_placeholder] | |
[publishers.append(publisher) for publisher in publishers_placeholder] | |
[descriptions.append(description) for description in descriptions_placeholder] | |
[images.append(image) for image in images_placeholder] | |
# Calculate the elapsed time between the first and second checkpoints | |
second_checkpoint = time.time() | |
second_checkpoint_time = int(second_checkpoint - first_checkpoint) | |
def openai_search(query, n_results=10): | |
""" | |
Create a query to the OpenAI ChatGPT API and return the results. | |
""" | |
import openai | |
# Initialize the lists to store the results | |
titles = [] | |
authors = [] | |
publishers = [] | |
descriptions = [] | |
images = [] | |
# Set the OpenAI API key | |
openai.api_key = "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" | |
# Set the OpenAI API key | |
openai.api_key = "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" | |
# Create ChatGPT query | |
chatgpt_response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You are a librarian. You are helping a patron find a book.", | |
}, | |
{ | |
"role": "user", | |
"content": f"Recommend me {n_results} books about {query}. Your response should be like: 'title: <title>, author: <author>, publisher: <publisher>, summary: <summary>'", | |
}, | |
], | |
) | |
# Split the response into a list of results | |
chatgpt_results = chatgpt_response["choices"][0]["message"]["content"].split( | |
"\n" | |
)[2::2] | |
# Define a function to parse the results | |
def parse_result( | |
result, ordered_keys=["Title", "Author", "Publisher", "Summary"] | |
): | |
# Create a dict to store the key-value pairs | |
parsed_result = {} | |
for key in ordered_keys: | |
# Split the result string by the key and append the value to the list | |
if key != ordered_keys[-1]: | |
parsed_result[key] = result.split(f"{key}: ")[1].split(",")[0] | |
else: | |
parsed_result[key] = result.split(f"{key}: ")[1] | |
return parsed_result | |
ordered_keys = ["Title", "Author", "Publisher", "Summary"] | |
for result in chatgpt_results: | |
try: | |
# Parse the result | |
parsed_result = parse_result(result, ordered_keys=ordered_keys) | |
# Append the parsed result to the lists | |
titles.append(parsed_result["Title"]) | |
authors.append(parsed_result["Author"]) | |
publishers.append(parsed_result["Publisher"]) | |
descriptions.append(parsed_result["Summary"]) | |
images.append( | |
"https://bookstoreromanceday.org/wp-content/uploads/2020/08/book-cover-placeholder.png" | |
) | |
# In case the OpenAI API hits the limit | |
except IndexError: | |
break | |
return titles, authors, publishers, descriptions, images | |
if add_chatgpt_results: | |
# Run the openai_search function | |
( | |
titles_placeholder, | |
authors_placeholder, | |
publishers_placeholder, | |
descriptions_placeholder, | |
images_placeholder, | |
) = openai_search(query) | |
# Append the results to the lists | |
[titles.append(title) for title in titles_placeholder] | |
[authors.append(author) for author in authors_placeholder] | |
[publishers.append(publisher) for publisher in publishers_placeholder] | |
[descriptions.append(description) for description in descriptions_placeholder] | |
[images.append(image) for image in images_placeholder] | |
# Calculate the elapsed time between the second and third checkpoints | |
third_checkpoint = time.time() | |
third_checkpoint_time = int(third_checkpoint - second_checkpoint) | |
# Combine title, description, and publisher into a single string | |
combined_data = [ | |
f"The book's title is {title}. It is published by {publisher}. This book is about {description}" | |
for title, description, publisher in zip(titles, descriptions, publishers) | |
] | |
def find_similar(combined_data, top_k=10): | |
""" | |
Calculate the similarity between the books and return the top_k results. | |
""" | |
from sentence_transformers import SentenceTransformer | |
from sentence_transformers import util | |
sentence_transformer = SentenceTransformer("all-MiniLM-L6-v2") | |
book_embeddings = sentence_transformer.encode( | |
combined_data, convert_to_tensor=True | |
) | |
# Make sure that the top_k value is not greater than the number of books | |
top_k = len(combined_data) if top_k > len(combined_data) else top_k | |
similar_books = [] | |
for i in range(len(combined_data)): | |
# Get the embedding for the ith book | |
current_embedding = book_embeddings[i] | |
# Calculate the similarity between the ith book and the rest of the books | |
similarity_sorted = util.semantic_search( | |
current_embedding, book_embeddings, top_k=top_k | |
) | |
# Append the results to the list | |
similar_books.append( | |
{ | |
"sorted_by_similarity": similarity_sorted[0][1:], | |
} | |
) | |
return similar_books | |
def summarize(descriptions, runtime="normal"): | |
""" | |
Summarize the descriptions and return the results. | |
""" | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForSeq2SeqLM, | |
pipeline, | |
) | |
from optimum.onnxruntime import ORTModelForSeq2SeqLM | |
from optimum.bettertransformer import BetterTransformer | |
# Define the summarizer model and tokenizer | |
if runtime == "normal": | |
tokenizer = AutoTokenizer.from_pretrained("lidiya/bart-base-samsum") | |
model = AutoModelForSeq2SeqLM.from_pretrained("lidiya/bart-base-samsum") | |
model = BetterTransformer.transform(model) | |
elif runtime == "onnxruntime": | |
tokenizer = AutoTokenizer.from_pretrained("optimum/t5-small") | |
model = ORTModelForSeq2SeqLM.from_pretrained("optimum/t5-small") | |
# Create the summarizer pipeline | |
summarizer_pipe = pipeline( | |
"summarization", | |
model=model, | |
tokenizer=tokenizer, | |
min_length=10, | |
max_length=128, | |
) | |
# Summarize the descriptions | |
summaries = [ | |
summarizer_pipe(description) | |
if (len(description) > 0) | |
else [{"summary_text": "No summary text is available."}] | |
for description in descriptions | |
] | |
return summaries | |
def classify(combined_data, runtime="normal"): | |
""" | |
Create classifier pipeline and return the results. | |
""" | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
pipeline, | |
) | |
from optimum.onnxruntime import ORTModelForSequenceClassification | |
from optimum.bettertransformer import BetterTransformer | |
if runtime == "normal": | |
# Define the zero-shot classifier | |
tokenizer = AutoTokenizer.from_pretrained( | |
"sileod/deberta-v3-base-tasksource-nli" | |
) | |
model = AutoModelForSequenceClassification.from_pretrained( | |
"sileod/deberta-v3-base-tasksource-nli" | |
) | |
elif runtime == "onnxruntime": | |
tokenizer = AutoTokenizer.from_pretrained( | |
"optimum/distilbert-base-uncased-mnli" | |
) | |
model = ORTModelForSequenceClassification.from_pretrained( | |
"optimum/distilbert-base-uncased-mnli" | |
) | |
classifier_pipe = pipeline( | |
"zero-shot-classification", | |
model=model, | |
tokenizer=tokenizer, | |
hypothesis_template="This book is {}.", | |
batch_size=1, | |
device=-1, | |
multi_label=False, | |
) | |
# Define the candidate labels | |
level = [ | |
"Introductory", | |
"Advanced", | |
] | |
audience = ["Academic", "Not Academic", "Manual"] | |
classes = [ | |
{ | |
"audience": classifier_pipe(doc, audience), | |
"level": classifier_pipe(doc, level), | |
} | |
for doc in combined_data | |
] | |
return classes | |
# If true then run the similarity, summarize, and classify functions | |
if classification: | |
classes = classify(combined_data, runtime="normal") | |
else: | |
classes = [ | |
{"labels": ["No labels available."], "scores": [0]} | |
for i in range(len(combined_data)) | |
] | |
# Calculate the elapsed time between the third and fourth checkpoints | |
fourth_checkpoint = time.time() | |
classification_time = int(fourth_checkpoint - third_checkpoint) | |
if summarization: | |
summaries = summarize(descriptions, runtime="normal") | |
else: | |
summaries = [ | |
[{"summary_text": description}] | |
if (len(description) > 0) | |
else [{"summary_text": "No summary text is available."}] | |
for description in descriptions | |
] | |
# Calculate the elapsed time between the fourth and fifth checkpoints | |
fifth_checkpoint = time.time() | |
summarization_time = int(fifth_checkpoint - fourth_checkpoint) | |
if similarity: | |
similar_books = find_similar(combined_data) | |
else: | |
similar_books = [ | |
{"sorted_by_similarity": ["No similar books available."]} | |
for i in range(len(combined_data)) | |
] | |
# Calculate the elapsed time between the fifth and sixth checkpoints | |
sixth_checkpoint = time.time() | |
similarity_time = int(sixth_checkpoint - fifth_checkpoint) | |
# Calculate the total elapsed time | |
end_time = time.time() | |
runtime = f"{end_time - start_time:.2f} seconds" | |
# Create a list of dictionaries to store the results | |
results = [] | |
for i in range(len(titles)): | |
results.append( | |
{ | |
"id": i, | |
"title": titles[i], | |
"author": authors[i], | |
"publisher": publishers[i], | |
"image_link": images[i], | |
"audience": classes[i]["audience"]["labels"][0], | |
"audience_confidence": classes[i]["audience"]["scores"][0], | |
"level": classes[i]["level"]["labels"][0], | |
"level_confidence": classes[i]["level"]["scores"][0], | |
"summary": summaries[i][0]["summary_text"], | |
"similar_books": similar_books[i]["sorted_by_similarity"], | |
"runtime": { | |
"total": runtime, | |
"classification": classification_time, | |
"summarization": summarization_time, | |
"similarity": similarity_time, | |
}, | |
} | |
) | |
return results | |