import gradio as gr from transformers import pipeline, MarianMTModel, AutoTokenizer import feedparser from datetime import datetime, timedelta import json import os from concurrent.futures import ThreadPoolExecutor, TimeoutError import logging import hashlib import requests from functools import lru_cache import time from email.utils import parsedate_to_datetime import pytz from bs4 import BeautifulSoup # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # News sources and their RSS feeds NEWS_SOURCES = { "Technology": { "TechCrunch": "https://techcrunch.com/feed/", "Wired": "https://www.wired.com/feed/rss", "The Verge": "https://www.theverge.com/rss/index.xml" }, "Business": { "Financial Times": "https://www.ft.com/rss/home", "Business Insider": "https://www.businessinsider.com/rss", "Forbes": "https://www.forbes.com/real-time/feed2/" }, "Science": { "Science Daily": "https://www.sciencedaily.com/rss/all.xml", "Nature": "http://feeds.nature.com/nature/rss/current", "Scientific American": "http://rss.sciam.com/ScientificAmerican-Global" }, "World News": { "Reuters": "http://feeds.reuters.com/reuters/topNews", "BBC": "http://feeds.bbci.co.uk/news/world/rss.xml", "CNN": "http://rss.cnn.com/rss/edition_world.rss" } } # Language codes and their corresponding MarianMT model names LANGUAGE_CODES = { "English": {"code": "en", "model": None}, # No translation needed for English "Spanish": {"code": "es", "model": "Helsinki-NLP/opus-mt-en-es"}, "French": {"code": "fr", "model": "Helsinki-NLP/opus-mt-en-fr"}, "German": {"code": "de", "model": "Helsinki-NLP/opus-mt-en-de"}, "Italian": {"code": "it", "model": "Helsinki-NLP/opus-mt-en-it"}, "Portuguese": {"code": "pt", "model": "Helsinki-NLP/opus-mt-en-pt"}, "Dutch": {"code": "nl", "model": "Helsinki-NLP/opus-mt-en-nl"}, "Russian": {"code": "ru", "model": "Helsinki-NLP/opus-mt-en-ru"}, "Chinese": {"code": "zh", "model": "Helsinki-NLP/opus-mt-en-zh"}, "Japanese": {"code": "ja", "model": "Helsinki-NLP/opus-mt-en-jap"}, "Arabic": {"code": "ar", "model": "Helsinki-NLP/opus-mt-en-ar"} } # Initialize global variables summarizer = None translators = {} class NewsCache: def __init__(self): self.summaries = {} self.translations = {} self.max_cache_size = 1000 def store_summary(self, content_hash, summary, language=None): cache_key = f"{content_hash}_{language}" if language else content_hash if len(self.summaries) >= self.max_cache_size: self.summaries.pop(next(iter(self.summaries))) self.summaries[cache_key] = summary def get_summary(self, content_hash, language=None): cache_key = f"{content_hash}_{language}" if language else content_hash return self.summaries.get(cache_key) news_cache = NewsCache() def get_content_hash(content): """Generate a hash for the content""" return hashlib.md5(content.encode()).hexdigest() def parse_date(date_str): """Parse date string to datetime object""" try: return parsedate_to_datetime(date_str).replace(tzinfo=pytz.UTC) except: return None def fetch_news_from_rss(categories): """Fetch news from RSS feeds based on user interests""" articles = [] cutoff_time = datetime.now(pytz.UTC) - timedelta(hours=8) for category in categories: if category in NEWS_SOURCES: for source, feed_url in NEWS_SOURCES[category].items(): try: feed = feedparser.parse(feed_url) for entry in feed.entries: published = parse_date(entry.get('published')) if published and published > cutoff_time: articles.append({ 'title': entry.get('title', ''), 'description': BeautifulSoup(entry.get('description', ''), 'html.parser').get_text(), 'link': entry.get('link', ''), 'published': entry.get('published', ''), 'category': category, 'source': source }) except Exception as e: logging.error(f"Error fetching from {feed_url}: {e}") continue return articles def initialize_models(): """Initialize the summarization and translation models""" global summarizer, translators try: # Initialize summarizer summarizer = pipeline( "summarization", model="facebook/bart-large-cnn", device=-1 # Use CPU ) # Initialize translators for each language for lang, info in LANGUAGE_CODES.items(): if info["model"]: # Skip English as it doesn't need translation try: model = MarianMTModel.from_pretrained(info["model"]) tokenizer = AutoTokenizer.from_pretrained(info["model"]) translators[lang] = (model, tokenizer) logging.info(f"Initialized translator for {lang}") except Exception as e: logging.error(f"Error initializing translator for {lang}: {e}") return True except Exception as e: logging.error(f"Error initializing models: {e}") return False def translate_text(text, target_language): """Translate text to target language""" if target_language == "English" or not text: return text try: if target_language not in translators: logging.error(f"Translator not found for {target_language}") return text model, tokenizer = translators[target_language] # Split text into chunks to handle long text max_length = 512 chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)] translated_chunks = [] for chunk in chunks: inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) translated = model.generate(**inputs) translated_text = tokenizer.decode(translated[0], skip_special_tokens=True) translated_chunks.append(translated_text) return " ".join(translated_chunks) except Exception as e: logging.error(f"Translation error: {e}") return text def generate_summary(text, title="", category="", language="English"): """Generate summary with translation support""" if not summarizer: if not initialize_models(): return None try: # Check cache first content_hash = get_content_hash(text) cached_summary = news_cache.get_summary(content_hash, language) if cached_summary: return cached_summary # Generate English summary first prompt_template = f""" Analyze and summarize this {category} news article titled "{title}". Focus on providing: 1. Main story/announcement/finding 2. Key details and implications 3. Relevant context or background 4. Any significant numbers, statistics, or quotes 5. Future implications or next steps (if mentioned) Article text: {text} Please provide a clear, concise summary that a general audience can understand:""" prompted_text = prompt_template.format(text=text[:1024]) result = summarizer(prompted_text, max_length=200, min_length=50, do_sample=False, truncation=True) if result and len(result) > 0: summary = result[0]['summary_text'] # Post-process summary summary = summary.replace(" .", ".").replace(" ,", ",") sentences = summary.split(". ") formatted_summary = "\nā€¢ " + "\nā€¢ ".join(filter(None, sentences)) # Translate if needed if language != "English": formatted_summary = translate_text(formatted_summary, language) news_cache.store_summary(content_hash, formatted_summary, language) return formatted_summary return None except Exception as e: logging.error(f"Summarization error: {e}") return None def get_personalized_summary(name, progress=gr.Progress()): """Generate personalized news summary in user's preferred language""" start_time = time.time() logging.info(f"Starting summary generation for user: {name}") if not name: return "Please enter your name!" try: with open(f"user_preferences/preferences_{name}.json", "r") as f: preferences = json.load(f) except FileNotFoundError: return "Please set your preferences first!" except Exception as e: return f"Error loading preferences: {e}" user_language = preferences.get("language", "English") # Fetch articles with progress progress(0.2, desc="Fetching recent news...") articles = fetch_news_from_rss(preferences["interests"]) if not articles: return translate_text("No recent news articles found from the last 8 hours. Please try again later.", user_language) # Process articles with timeout progress(0.4, desc="Analyzing and summarizing...") summaries = [] total_articles = len(articles) max_processing_time = 60 for i, article in enumerate(articles): if time.time() - start_time > max_processing_time: logging.warning("Processing time exceeded maximum limit") break try: progress((i + 1) / total_articles * 0.8 + 0.4) title = article['title'] content = article['description'] category = article['category'] link = article['link'] published = parse_date(article['published']) published_str = published.strftime('%Y-%m-%d %H:%M UTC') if published else 'Recently' if not content: continue summary = generate_summary(content, title, category, user_language) if not summary: continue # Translate title and category if needed if user_language != "English": title = translate_text(title, user_language) category = translate_text(category, user_language) published_str = translate_text(published_str, user_language) formatted_summary = f""" šŸ“° {title} šŸ“ {translate_text("Category", user_language)}: {category} ā° {translate_text("Published", user_language)}: {published_str} {summary} šŸ”— {translate_text("Read more", user_language)}: {link} ---""" summaries.append(formatted_summary) except Exception as e: logging.error(f"Error processing article: {e}") continue if not summaries: return translate_text("Unable to generate summaries for recent news. Please try again.", user_language) progress(1.0, desc="Done!") return "\n".join(summaries) # Gradio interface with gr.Blocks(title="Enhanced News Summarizer") as demo: gr.Markdown("# šŸ“° Enhanced AI News Summarizer") with gr.Tab("Set Preferences"): name_input = gr.Textbox(label="Your Name") language_dropdown = gr.Dropdown( choices=list(LANGUAGE_CODES.keys()), label="Preferred Language", value="English" ) interests_checkboxes = gr.CheckboxGroup( choices=list(NEWS_SOURCES.keys()), label="News Interests (Select multiple)" ) save_button = gr.Button("Save Preferences") preferences_output = gr.Textbox(label="Status") def save_preferences(name, language, interests): if not name or not language or not interests: return "Please fill in all required fields!" preferences = { "name": name, "language": language, "interests": interests, "last_updated": datetime.now().isoformat() } try: os.makedirs('user_preferences', exist_ok=True) with open(f"user_preferences/preferences_{name}.json", "w") as f: json.dump(preferences, f) return f"Preferences saved for {name}!" except Exception as e: logging.error(f"Error saving preferences: {e}") return f"Error saving preferences: {e}" save_button.click( save_preferences, inputs=[name_input, language_dropdown, interests_checkboxes], outputs=[preferences_output] ) with gr.Tab("Get News Summary"): name_check = gr.Textbox(label="Enter your name to get summary") get_summary_button = gr.Button("Get Summary") summary_output = gr.Textbox( label="Your Personalized News Summary", lines=20 ) get_summary_button.click( get_personalized_summary, inputs=[name_check], outputs=[summary_output] ) if __name__ == "__main__": if initialize_models(): demo.launch() else: print("Failed to initialize summarizer. Please check the logs.")