Spaces:
Sleeping
Sleeping
import gradio as gr | |
from transformers import pipeline, MarianMTModel, AutoTokenizer | |
import feedparser | |
from datetime import datetime, timedelta | |
import json | |
import os | |
from concurrent.futures import ThreadPoolExecutor, TimeoutError | |
import logging | |
import hashlib | |
import requests | |
from functools import lru_cache | |
import time | |
from email.utils import parsedate_to_datetime | |
import pytz | |
from bs4 import BeautifulSoup | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
# News sources and their RSS feeds | |
NEWS_SOURCES = { | |
"Technology": { | |
"TechCrunch": "https://techcrunch.com/feed/", | |
"Wired": "https://www.wired.com/feed/rss", | |
"The Verge": "https://www.theverge.com/rss/index.xml" | |
}, | |
"Business": { | |
"Financial Times": "https://www.ft.com/rss/home", | |
"Business Insider": "https://www.businessinsider.com/rss", | |
"Forbes": "https://www.forbes.com/real-time/feed2/" | |
}, | |
"Science": { | |
"Science Daily": "https://www.sciencedaily.com/rss/all.xml", | |
"Nature": "http://feeds.nature.com/nature/rss/current", | |
"Scientific American": "http://rss.sciam.com/ScientificAmerican-Global" | |
}, | |
"World News": { | |
"Reuters": "http://feeds.reuters.com/reuters/topNews", | |
"BBC": "http://feeds.bbci.co.uk/news/world/rss.xml", | |
"CNN": "http://rss.cnn.com/rss/edition_world.rss" | |
} | |
} | |
# Language codes and their corresponding MarianMT model names | |
LANGUAGE_CODES = { | |
"English": {"code": "en", "model": None}, # No translation needed for English | |
"Spanish": {"code": "es", "model": "Helsinki-NLP/opus-mt-en-es"}, | |
"French": {"code": "fr", "model": "Helsinki-NLP/opus-mt-en-fr"}, | |
"German": {"code": "de", "model": "Helsinki-NLP/opus-mt-en-de"}, | |
"Italian": {"code": "it", "model": "Helsinki-NLP/opus-mt-en-it"}, | |
"Portuguese": {"code": "pt", "model": "Helsinki-NLP/opus-mt-en-pt"}, | |
"Dutch": {"code": "nl", "model": "Helsinki-NLP/opus-mt-en-nl"}, | |
"Russian": {"code": "ru", "model": "Helsinki-NLP/opus-mt-en-ru"}, | |
"Chinese": {"code": "zh", "model": "Helsinki-NLP/opus-mt-en-zh"}, | |
"Japanese": {"code": "ja", "model": "Helsinki-NLP/opus-mt-en-jap"}, | |
"Arabic": {"code": "ar", "model": "Helsinki-NLP/opus-mt-en-ar"} | |
} | |
# Initialize global variables | |
summarizer = None | |
translators = {} | |
class NewsCache: | |
def __init__(self): | |
self.summaries = {} | |
self.translations = {} | |
self.max_cache_size = 1000 | |
def store_summary(self, content_hash, summary, language=None): | |
cache_key = f"{content_hash}_{language}" if language else content_hash | |
if len(self.summaries) >= self.max_cache_size: | |
self.summaries.pop(next(iter(self.summaries))) | |
self.summaries[cache_key] = summary | |
def get_summary(self, content_hash, language=None): | |
cache_key = f"{content_hash}_{language}" if language else content_hash | |
return self.summaries.get(cache_key) | |
news_cache = NewsCache() | |
def get_content_hash(content): | |
"""Generate a hash for the content""" | |
return hashlib.md5(content.encode()).hexdigest() | |
def parse_date(date_str): | |
"""Parse date string to datetime object""" | |
try: | |
return parsedate_to_datetime(date_str).replace(tzinfo=pytz.UTC) | |
except: | |
return None | |
def fetch_news_from_rss(categories): | |
"""Fetch news from RSS feeds based on user interests""" | |
articles = [] | |
cutoff_time = datetime.now(pytz.UTC) - timedelta(hours=8) | |
for category in categories: | |
if category in NEWS_SOURCES: | |
for source, feed_url in NEWS_SOURCES[category].items(): | |
try: | |
feed = feedparser.parse(feed_url) | |
for entry in feed.entries: | |
published = parse_date(entry.get('published')) | |
if published and published > cutoff_time: | |
articles.append({ | |
'title': entry.get('title', ''), | |
'description': BeautifulSoup(entry.get('description', ''), 'html.parser').get_text(), | |
'link': entry.get('link', ''), | |
'published': entry.get('published', ''), | |
'category': category, | |
'source': source | |
}) | |
except Exception as e: | |
logging.error(f"Error fetching from {feed_url}: {e}") | |
continue | |
return articles | |
def initialize_models(): | |
"""Initialize the summarization and translation models""" | |
global summarizer, translators | |
try: | |
# Initialize summarizer | |
summarizer = pipeline( | |
"summarization", | |
model="facebook/bart-large-cnn", | |
device=-1 # Use CPU | |
) | |
# Initialize translators for each language | |
for lang, info in LANGUAGE_CODES.items(): | |
if info["model"]: # Skip English as it doesn't need translation | |
try: | |
model = MarianMTModel.from_pretrained(info["model"]) | |
tokenizer = AutoTokenizer.from_pretrained(info["model"]) | |
translators[lang] = (model, tokenizer) | |
logging.info(f"Initialized translator for {lang}") | |
except Exception as e: | |
logging.error(f"Error initializing translator for {lang}: {e}") | |
return True | |
except Exception as e: | |
logging.error(f"Error initializing models: {e}") | |
return False | |
def translate_text(text, target_language): | |
"""Translate text to target language""" | |
if target_language == "English" or not text: | |
return text | |
try: | |
if target_language not in translators: | |
logging.error(f"Translator not found for {target_language}") | |
return text | |
model, tokenizer = translators[target_language] | |
# Split text into chunks to handle long text | |
max_length = 512 | |
chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)] | |
translated_chunks = [] | |
for chunk in chunks: | |
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) | |
translated = model.generate(**inputs) | |
translated_text = tokenizer.decode(translated[0], skip_special_tokens=True) | |
translated_chunks.append(translated_text) | |
return " ".join(translated_chunks) | |
except Exception as e: | |
logging.error(f"Translation error: {e}") | |
return text | |
def generate_summary(text, title="", category="", language="English"): | |
"""Generate summary with translation support""" | |
if not summarizer: | |
if not initialize_models(): | |
return None | |
try: | |
# Check cache first | |
content_hash = get_content_hash(text) | |
cached_summary = news_cache.get_summary(content_hash, language) | |
if cached_summary: | |
return cached_summary | |
# Generate English summary first | |
prompt_template = f""" | |
Analyze and summarize this {category} news article titled "{title}". | |
Focus on providing: | |
1. Main story/announcement/finding | |
2. Key details and implications | |
3. Relevant context or background | |
4. Any significant numbers, statistics, or quotes | |
5. Future implications or next steps (if mentioned) | |
Article text: | |
{text} | |
Please provide a clear, concise summary that a general audience can understand:""" | |
prompted_text = prompt_template.format(text=text[:1024]) | |
result = summarizer(prompted_text, | |
max_length=200, | |
min_length=50, | |
do_sample=False, | |
truncation=True) | |
if result and len(result) > 0: | |
summary = result[0]['summary_text'] | |
# Post-process summary | |
summary = summary.replace(" .", ".").replace(" ,", ",") | |
sentences = summary.split(". ") | |
formatted_summary = "\nβ’ " + "\nβ’ ".join(filter(None, sentences)) | |
# Translate if needed | |
if language != "English": | |
formatted_summary = translate_text(formatted_summary, language) | |
news_cache.store_summary(content_hash, formatted_summary, language) | |
return formatted_summary | |
return None | |
except Exception as e: | |
logging.error(f"Summarization error: {e}") | |
return None | |
def get_personalized_summary(name, progress=gr.Progress()): | |
"""Generate personalized news summary in user's preferred language""" | |
start_time = time.time() | |
logging.info(f"Starting summary generation for user: {name}") | |
if not name: | |
return "Please enter your name!" | |
try: | |
with open(f"user_preferences/preferences_{name}.json", "r") as f: | |
preferences = json.load(f) | |
except FileNotFoundError: | |
return "Please set your preferences first!" | |
except Exception as e: | |
return f"Error loading preferences: {e}" | |
user_language = preferences.get("language", "English") | |
# Fetch articles with progress | |
progress(0.2, desc="Fetching recent news...") | |
articles = fetch_news_from_rss(preferences["interests"]) | |
if not articles: | |
return translate_text("No recent news articles found from the last 8 hours. Please try again later.", user_language) | |
# Process articles with timeout | |
progress(0.4, desc="Analyzing and summarizing...") | |
summaries = [] | |
total_articles = len(articles) | |
max_processing_time = 60 | |
for i, article in enumerate(articles): | |
if time.time() - start_time > max_processing_time: | |
logging.warning("Processing time exceeded maximum limit") | |
break | |
try: | |
progress((i + 1) / total_articles * 0.8 + 0.4) | |
title = article['title'] | |
content = article['description'] | |
category = article['category'] | |
link = article['link'] | |
published = parse_date(article['published']) | |
published_str = published.strftime('%Y-%m-%d %H:%M UTC') if published else 'Recently' | |
if not content: | |
continue | |
summary = generate_summary(content, title, category, user_language) | |
if not summary: | |
continue | |
# Translate title and category if needed | |
if user_language != "English": | |
title = translate_text(title, user_language) | |
category = translate_text(category, user_language) | |
published_str = translate_text(published_str, user_language) | |
formatted_summary = f""" | |
π° {title} | |
π {translate_text("Category", user_language)}: {category} | |
β° {translate_text("Published", user_language)}: {published_str} | |
{summary} | |
π {translate_text("Read more", user_language)}: {link} | |
---""" | |
summaries.append(formatted_summary) | |
except Exception as e: | |
logging.error(f"Error processing article: {e}") | |
continue | |
if not summaries: | |
return translate_text("Unable to generate summaries for recent news. Please try again.", user_language) | |
progress(1.0, desc="Done!") | |
return "\n".join(summaries) | |
# Gradio interface | |
with gr.Blocks(title="Enhanced News Summarizer") as demo: | |
gr.Markdown("# π° Enhanced AI News Summarizer") | |
with gr.Tab("Set Preferences"): | |
name_input = gr.Textbox(label="Your Name") | |
language_dropdown = gr.Dropdown( | |
choices=list(LANGUAGE_CODES.keys()), | |
label="Preferred Language", | |
value="English" | |
) | |
interests_checkboxes = gr.CheckboxGroup( | |
choices=list(NEWS_SOURCES.keys()), | |
label="News Interests (Select multiple)" | |
) | |
save_button = gr.Button("Save Preferences") | |
preferences_output = gr.Textbox(label="Status") | |
def save_preferences(name, language, interests): | |
if not name or not language or not interests: | |
return "Please fill in all required fields!" | |
preferences = { | |
"name": name, | |
"language": language, | |
"interests": interests, | |
"last_updated": datetime.now().isoformat() | |
} | |
try: | |
os.makedirs('user_preferences', exist_ok=True) | |
with open(f"user_preferences/preferences_{name}.json", "w") as f: | |
json.dump(preferences, f) | |
return f"Preferences saved for {name}!" | |
except Exception as e: | |
logging.error(f"Error saving preferences: {e}") | |
return f"Error saving preferences: {e}" | |
save_button.click( | |
save_preferences, | |
inputs=[name_input, language_dropdown, interests_checkboxes], | |
outputs=[preferences_output] | |
) | |
with gr.Tab("Get News Summary"): | |
name_check = gr.Textbox(label="Enter your name to get summary") | |
get_summary_button = gr.Button("Get Summary") | |
summary_output = gr.Textbox( | |
label="Your Personalized News Summary", | |
lines=20 | |
) | |
get_summary_button.click( | |
get_personalized_summary, | |
inputs=[name_check], | |
outputs=[summary_output] | |
) | |
if __name__ == "__main__": | |
if initialize_models(): | |
demo.launch() | |
else: | |
print("Failed to initialize summarizer. Please check the logs.") | |