Spaces:
Runtime error
Runtime error
import pandas as pd | |
import requests | |
import isort | |
import black | |
import flair | |
import time | |
from bs4 import BeautifulSoup | |
import re | |
import numpy as np | |
import os | |
from flair.data import Sentence | |
from flair.models import SequenceTagger | |
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline | |
import string | |
import textwrap | |
import tweepy | |
import gradio as gr | |
URL = "https://www.formula1.com/content/fom-website/en/latest/all.xml" | |
api_key = os.environ['api_key'] | |
secret_api_key = os.environ['secret_api_key'] | |
access_token = os.environ['access_token'] | |
secret_access_token = os.environ['secret_access_token'] | |
bearer_token = os.environ['bearer_token'] | |
def get_xml(url): | |
# xpath is only for formula1 | |
# use urllib.parse to check for formula1.com website or other news | |
xml = pd.read_xml(url,xpath='channel/item') | |
return xml | |
cols_list = ['title', 'description', 'link', 'creator', 'guid'] | |
previous_xml = pd.DataFrame(columns=cols_list) | |
# care taken to only consider results where there are more words not a single word quotes | |
def extract_quote(string): | |
# Use the re.findall function to extract the quoted text | |
results = re.findall(r'[β\"](.*?)[β\"]', string) | |
quotes = [] | |
for result in results: | |
split_result = result.split() | |
if len(split_result) >3: | |
quotes.append(result) | |
return quotes | |
def get_names(text): | |
# # load the NER tagger | |
tagger = SequenceTagger.load('ner') | |
sentence = Sentence(text) | |
tagger.predict(sentence) | |
names = [] | |
for label in sentence.get_labels('ner'): | |
if label.value == "PER": | |
names.append(f"{label.data_point.text}") | |
# convert to a set to remove some of the repetitions | |
names = list(set(names)) | |
return names | |
def get_text(new_articles_df): | |
""" | |
quotes outputs a list of quotes | |
""" | |
dfs_dict = {} | |
for article in new_articles_df.iterrows(): | |
link = article[1]["guid"] | |
request = requests.get(link) | |
soup = BeautifulSoup(request.content, "html.parser") | |
# class_ below will be different for different websites | |
s = soup.find("div", class_="col-lg-8 col-xl-7 offset-xl-1 f1-article--content") | |
lines = s.find_all("p") | |
text_content = pd.DataFrame(data={"text": []}) | |
for i, line in enumerate(lines): | |
df = pd.DataFrame(data={"text": [line.text]}) | |
text_content = pd.concat([text_content, df], ignore_index=True) | |
strongs = s.find_all("strong") | |
strong_content = pd.DataFrame(data={"text": []}) | |
for i, strong in enumerate(strongs): | |
if i > 0: | |
df = pd.DataFrame(data={"text": [strong.text]}) | |
strong_content = pd.concat([strong_content, df], ignore_index=True) | |
# df has content | |
df = text_content[~text_content["text"].isin(strong_content["text"])].reset_index( | |
drop=True | |
) | |
# df["quote"] = df["text"].apply(lambda row: extract_quote(row)) | |
# # combine all rows into context | |
context = "" | |
for i,row in df.iterrows(): | |
context += f" {row['text']}" | |
quotes = extract_quote(context) | |
# to save some time not computing unnecessary NER | |
if len(quotes) != 0: | |
speakers = get_names(context) | |
else: | |
speakers = () | |
dfs_dict[link] = {'context':context, 'quotes':quotes, 'speakers':speakers} | |
return dfs_dict | |
def load_speaker_model(): | |
model_name = f"deepset/xlm-roberta-large-squad2" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForQuestionAnswering.from_pretrained(model_name) | |
question_answerer = pipeline("question-answering", model=model, tokenizer=tokenizer) | |
return question_answerer | |
question_answerer = load_speaker_model() | |
def remove_punctuations(text): | |
modified_text = "".join([character for character in text if character not in string.punctuation]) | |
modified_text = modified_text.lstrip(" ") | |
modified_text = modified_text.rstrip(" ") | |
return modified_text | |
def get_speaker_quotes(dfs_dict, question_answerer): | |
speaker_quote = [] | |
for link in dfs_dict: | |
context = dfs_dict[link]['context'] | |
quotes = dfs_dict[link]['quotes'] | |
potential_speakers = dfs_dict[link]['speakers'] | |
if len(quotes) != 0: | |
#loop through the list of quotes | |
for quote in quotes: | |
# max_seq_len == 384 : https://huggingface.co/deepset/roberta-base-squad2 | |
full_quote = quote | |
if len(quote) >380: | |
quote = quote[:384] | |
speaker_dict = question_answerer(question=f"Who said '{quote}'?", context=context) | |
speaker = speaker_dict['answer'] | |
if len(speaker) >0: | |
speaker = remove_punctuations(speaker_dict['answer']) | |
if speaker not in potential_speakers: | |
speaker = "" | |
quote = "" | |
else: | |
pair = {'speaker':speaker, 'quote': quote, 'source':link} | |
speaker_quote.append(pair) | |
return speaker_quote | |
def post_to_twitter(): | |
twitter_api_key = api_key | |
twitter_secret_api_key = secret_api_key | |
twitter_access_token = access_token | |
twitter_secret_access_token = secret_access_token | |
twitter_bearer_token = bearer_token | |
api = tweepy.Client(bearer_token=twitter_bearer_token, consumer_key=twitter_api_key, | |
consumer_secret=twitter_secret_api_key, access_token=twitter_access_token, | |
access_token_secret=twitter_secret_access_token,wait_on_rate_limit=True | |
) | |
#tweet = api.create_tweet(text=post_title, in_reply_to_tweet_id=in_reply_to_tweet_id) | |
return api | |
def split_near_space(string, max_length): | |
# Split the string into lines based on the maximum line width, breaking only at spaces | |
lines = textwrap.wrap(string, width=max_length,) | |
return lines | |
def send_tweets(speaker_quote): | |
for i, pair in enumerate(speaker_quote): | |
speaker = pair['speaker'] | |
quote = pair['quote'] | |
source = pair['source'] | |
total_tweet_length = len(speaker) + len(quote) + 10 # 10 is for emojis and #f1 hashtag | |
tweet_text = f"π£οΈ | {speaker}: '{quote}'" | |
api = post_to_twitter() | |
if total_tweet_length < 280: | |
try: | |
first_tweet = api.create_tweet(text=tweet_text, ) | |
first_tweet_id = first_tweet.data['id'] | |
second_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=first_tweet_id) | |
except: | |
continue | |
else: | |
quotes_list = split_near_space(quote, 280 - len(speaker) -10) | |
thread_id = None | |
try: | |
for i, quote in enumerate(quotes_list): | |
tweet_text = f"'...{quote}...'" | |
if i == 0: | |
tweet_text = f"π£οΈ | {speaker}: '{quote}...'" | |
if i ==len(quotes_list) -1: | |
tweet_text = f"'...{quote}'" | |
recent_tweet = api.create_tweet(text=tweet_text, in_reply_to_tweet_id=thread_id) | |
thread_id = recent_tweet.data['id'] | |
last_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=thread_id) | |
except: | |
continue | |
def check_updates(every=300): | |
while True: | |
time.sleep(every) | |
latest_xml = get_xml(URL) | |
if ~previous_xml.equals(latest_xml): | |
print('New articles found') | |
new_articles_df = latest_xml[~latest_xml["guid"].isin(previous_xml["guid"])] | |
# loops through new articles and gets the necessary text, quotes and speakers | |
dfs_dict = get_text(new_articles_df) | |
speaker_quote = get_speaker_quotes(dfs_dict, question_answerer) | |
send_tweets(speaker_quote) | |
else: | |
print('No New article is found') | |
demo = gr.Interface(fn=check_updates, inputs="number", outputs="text", analytics_enabled=True) | |
demo.launch(max_threads=1, show_api=False) | |