Spaces:
Sleeping
Sleeping
| import requests | |
| import pandas as pd | |
| import gradio as gr | |
| import datetime | |
| import nltk | |
| from datetime import datetime, timedelta | |
| from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
| try: | |
| nltk.data.find('sentiment/vader_lexicon') | |
| except LookupError: | |
| nltk.download('vader_lexicon') | |
| from transformers import pipeline | |
| summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dotenv import load_dotenv | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| load_dotenv() | |
| api_key = os.getenv("API_KEY") | |
| if not api_key: | |
| raise ValueError("API_KEY not found. Make sure to set it in the .env file.") | |
| SOURCE_BIAS_MAP = { | |
| "fox news": "right", | |
| "breitbart": "right", | |
| "new york post": "right", | |
| "the wall street journal": "center-right", | |
| "reuters": "center", | |
| "associated press": "center", | |
| "bloomberg": "center", | |
| "npr": "center-left", | |
| "cnn": "left", | |
| "msnbc": "left", | |
| "the new york times": "left", | |
| "the washington post": "left", | |
| "the guardian": "left", | |
| "bbc news": "center", | |
| "sky news": "center-right", | |
| "the telegraph": "right", | |
| "the times": "center-right", | |
| "daily mail": "right", | |
| "the independent": "center-left", | |
| "the sun": "right", | |
| "financial times": "center", | |
| } | |
| BIAS_SCORE_MAP = { | |
| "left": -1, | |
| "center-left": -0.5, | |
| "center": 0, | |
| "center-right": 0.5, | |
| "right": 1, | |
| "unknown": 0 | |
| } | |
| def query(topic, sort_by="popularity", max_tokens=100): | |
| if not topic: | |
| print("Topic must be provided.") | |
| return None | |
| today = datetime.today() | |
| last_week = today - timedelta(days=7) | |
| from_date = last_week.strftime('%Y-%m-%d') | |
| to_date = today.strftime('%Y-%m-%d') | |
| base_url = "https://newsapi.org/v2/everything" | |
| url = ( | |
| f"{base_url}?q={topic}&from={from_date}&to={to_date}" | |
| f"&sortBy={sort_by}&pageSize=20&apiKey={api_key}" | |
| ) | |
| try: | |
| response = requests.get(url, timeout=10) | |
| if response.status_code != 200: | |
| print(f"API returned error: {response.status_code}") | |
| return None | |
| data = response.json() | |
| if data.get("totalResults", 0) == 0: | |
| print("No articles found for the given query and date range.") | |
| return None | |
| articles = data.get("articles", []) | |
| extracted = [ | |
| { | |
| "title": article.get("title", "N/A"), | |
| "description": article.get("description", "N/A"), | |
| "source_name": article.get("source", {}).get("name", "N/A"), | |
| "url": article.get("url", "N/A"), | |
| "publishedAt": article.get("publishedAt", "N/A"), | |
| } | |
| for article in articles | |
| ] | |
| return pd.DataFrame(extracted) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return None | |
| def process_data(df): | |
| if df is None or df.empty or not all(col in df.columns for col in ["title", "description"]): | |
| print("Invalid or empty DataFrame passed to process_data()") | |
| return pd.DataFrame() | |
| df_cleaned = df.dropna(subset=["title", "description"]) | |
| df_cleaned = df_cleaned[df_cleaned["title"].str.strip() != ""] | |
| df_cleaned = df_cleaned[df_cleaned["description"].str.strip() != ""] | |
| df_cleaned = df_cleaned.drop_duplicates(subset=["title", "url"]) | |
| df_cleaned["text"] = df_cleaned["title"] + df_cleaned["description"].str.lower() | |
| return df_cleaned | |
| def analyse_sentiment(df): | |
| analyser = SentimentIntensityAnalyzer() | |
| def get_scores(text): | |
| scores = analyser.polarity_scores(text) | |
| return scores['compound'], scores['neg'], scores['neu'], scores['pos'] | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| results = list(executor.map(get_scores, df['text'])) | |
| df[['compound', 'neg', 'neu', 'pos']] = results | |
| def label_sentiment(score): | |
| if score >= 0.05: | |
| return "positive" | |
| elif score <= -0.05: | |
| return "negative" | |
| else: | |
| return "neutral" | |
| df['sentiment_label'] = df['compound'].apply(label_sentiment) | |
| return df | |
| def get_bias_label(source_name): | |
| source = source_name.strip().lower() | |
| return SOURCE_BIAS_MAP.get(source, "unknown") | |
| def add_bias_annotation(df): | |
| bias_series = pd.Series(SOURCE_BIAS_MAP) | |
| df['bias_label'] = df['source_name'].str.strip().str.lower().map(bias_series).fillna("unknown") | |
| return df | |
| def set_article_extremity(df, top_n=5): | |
| def get_bias_extremity(label): | |
| return BIAS_SCORE_MAP.get(label, 0) | |
| df['bias_score'] = df['bias_label'].apply(get_bias_extremity) | |
| df['extremity_score'] = df['compound'].abs() + df['bias_score'].abs() | |
| df['extremity_pct'] = (df['extremity_score'] / 2) * 100 | |
| df['extremity_pct'] = df['extremity_pct'].round(1) | |
| df = df.sort_values(by='extremity_score', ascending=False) | |
| df['extreme'] = False | |
| df.loc[df.index[:top_n], 'extreme'] = True | |
| return df | |
| def summarise_text(row, max_tokens=512): | |
| try: | |
| text = row['text'] if 'text' in row and pd.notna(row['text']) else '' | |
| source_name = row['source_name'] if 'source_name' in row and pd.notna(row['source_name']) else 'unknown' | |
| input_length = len(text.split()) | |
| max_length = min(input_length - 10, max_tokens) | |
| min_length = max(10, max_length - 10) | |
| summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) | |
| summary_text = summary[0]['summary_text'] | |
| bias_label = get_bias_label(source_name) | |
| return pd.Series({'summary': summary_text, 'bias_score': bias_label, 'source': source_name}) | |
| except Exception as e: | |
| print(f"Error summarising row: {e}") | |
| return pd.Series({'summary': 'Summary unavailable', 'bias_score': 'unknown', 'source': 'unknown'}) | |
| def add_article_summaries(df, max_tokens=512): | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| summaries = list(executor.map(lambda row: summarise_text(row, max_tokens), df.to_dict('records'))) | |
| summary_df = pd.DataFrame(summaries) | |
| df[['summary', 'bias_score', 'source']] = summary_df | |
| return df | |
| def main(): | |
| raw_df = query("Tesla") | |
| if raw_df is None or raw_df.empty: | |
| print("No data found!") | |
| return | |
| processed_df = process_data(raw_df) | |
| analyser = SentimentIntensityAnalyzer() | |
| sentiment_df = analyse_sentiment(processed_df, analyser) | |
| bias_df = add_bias_annotation(sentiment_df) | |
| extremity_df = set_article_extremity(bias_df) | |
| final_df = add_article_summaries(extremity_df) | |
| print(final_df.head()) | |
| if __name__ == "__main__": | |
| main() | |