vitocs123's picture
Upload 3 files
b7d26b5 verified
raw
history blame
10.7 kB
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import re
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
import time
import random
# ==============================
# Initialize FastAPI App
# ==============================
app = FastAPI(
title="Advanced News Scraper & Analysis API",
description="An API to scrape Google News, then perform sentiment and topic analysis using transformer models.",
version="3.2.0" # Version updated for robust scraping
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],
)
# ==============================
# Load Models (once on startup)
# ==============================
print("Loading sentiment analysis models... This may take a moment.")
finbert_name = "ProsusAI/finbert"
indobert_name = "w11wo/indonesian-roberta-base-sentiment-classifier"
finbert_tokenizer = AutoTokenizer.from_pretrained(finbert_name)
finbert_model = AutoModelForSequenceClassification.from_pretrained(finbert_name)
indobert_tokenizer = AutoTokenizer.from_pretrained(indobert_name)
indobert_model = AutoModelForSequenceClassification.from_pretrained(indobert_name)
print("Models loaded successfully.")
# ==============================
# Web Scraper Module - UPGRADED
# ==============================
# --- User-Agent Rotation ---
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/117.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/117.0',
]
# --- Proxy Configuration ---
# To use proxies, get a list of them and format them like: ['http://user:pass@ip:port']
# Using proxies is the most effective way to prevent 429 errors.
# --- BEFORE (empty list) ---
# PROXY_LIST = []
# --- AFTER (populated with your proxies) ---
PROXY_LIST = [
]
def get_random_proxy():
if not PROXY_LIST:
return None
proxy = random.choice(PROXY_LIST)
return {'http': proxy, 'https': proxy}
MONTH_MAP = {"Jan": "Jan", "Feb": "Feb", "Mar": "Mar", "Apr": "Apr", "Mei": "May", "Jun": "Jun", "Jul": "Jul", "Agu": "Aug", "Sep": "Sep", "Okt": "Oct", "Nov": "Nov", "Des": "Dec"}
def normalize_month(text):
for indo, eng in MONTH_MAP.items():
text = text.replace(indo, eng)
return text
def parse_news_date(date_text, indo=False):
now = datetime.now()
if not date_text: return "Unknown"
date_text = date_text.strip()
relative_match = re.match(r"(\d+)\s+(hour|day|week|month|year)s?\s+ago", date_text, re.I)
if relative_match:
value, unit = int(relative_match.group(1)), relative_match.group(2).lower()
if "hour" in unit: return (now - timedelta(hours=value)).strftime("%Y-%m-%d")
if "day" in unit: return (now - timedelta(days=value)).strftime("%Y-%m-%d")
if "week" in unit: return (now - timedelta(weeks=value)).strftime("%Y-%m-%d")
if "month" in unit: return (now - timedelta(days=30 * value)).strftime("%Y-%m-%d")
if "year" in unit: return (now - timedelta(days=365 * value)).strftime("%Y-%m-%d")
if indo:
indo_match = re.match(r"(\d+)\s+(jam|hari|minggu|bulan|tahun)\s+lalu", date_text, re.I)
if indo_match:
value, unit = int(indo_match.group(1)), indo_match.group(2).lower()
if unit == "jam": return (now - timedelta(hours=value)).strftime("%Y-%m-%d")
if unit == "hari": return (now - timedelta(days=value)).strftime("%Y-%m-%d")
if unit == "minggu": return (now - timedelta(weeks=value)).strftime("%Y-%m-%d")
if unit == "bulan": return (now - timedelta(days=30 * value)).strftime("%Y-%m-%d")
if unit == "tahun": return (now - timedelta(days=365 * value)).strftime("%Y-%m-%d")
fixed_text = normalize_month(date_text)
for fmt in ("%b %d, %Y", "%d %b %Y", "%b %d %Y", "%d %b, %Y", "%Y-%m-%d", "%d %B %Y"):
try:
return datetime.strptime(fixed_text, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return "Unknown"
def scrape_google_news(keyword, num_results=100, indo=False, start_date=None, end_date=None):
base_filter = "(business OR startup OR investment OR funding OR bisnis OR pendanaan)"
query = f'"{keyword}" AND {base_filter}'
url = f"https://www.google.com/search?q={query}&tbm=nws&num={num_results}"
if indo: url += "&gl=id&hl=id"
if start_date and end_date:
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d").strftime("%m/%d/%Y")
end_dt = datetime.strptime(end_date, "%Y-%m-%d").strftime("%m/%d/%Y")
url += f"&tbs=cdr:1,cd_min:{start_dt},cd_max:{end_dt}"
except Exception: pass
response = None
for attempt in range(4): # Increased retries
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
proxies = get_random_proxy()
print(f"Attempt {attempt+1} for '{keyword}' using UA: ...{headers['User-Agent'][-30:]}")
if proxies:
print(f"Using proxy: {proxies['http']}")
response = requests.get(url, headers=headers, proxies=proxies, timeout=20)
if response.status_code == 429:
wait_time = random.uniform(5, 10) * (attempt + 1)
print(f"⚠️ Rate limit hit. Waiting {wait_time:.2f}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
if "Our systems have detected unusual traffic" in response.text:
print("❌ Captcha page detected. Skipping this request.")
time.sleep(random.uniform(10, 20))
return []
break
except requests.RequestException as e:
print(f"Scraping failed for '{keyword}': {e}")
if attempt < 3:
time.sleep(random.uniform(3, 7))
else:
return []
if response is None:
print(f"❌ Failed to scrape for '{keyword}' after multiple retries.")
return []
soup = BeautifulSoup(response.content, "html.parser")
news_results = []
for el in soup.select("div.SoaBEf"):
try:
title = el.select_one("div.MBeuO").get_text(strip=True)
summary = el.select_one(".GI74Re").get_text(strip=True)
date_text = el.select_one(".LfVVr").get_text(strip=True)
source = el.select_one(".NUnG9d span").get_text(strip=True)
link = el.find("a")["href"]
if keyword.lower() not in f"{title} {summary}".lower():
continue
news_results.append({
"title": title, "summary": summary, "date": parse_news_date(date_text, indo=indo),
"source": source, "url": link
})
except Exception:
continue
return news_results
# ==============================
# Sentiment Analysis Module
# ==============================
from config import negative_keywords, positive_keywords, topic_keywords
LABELS = ["NEGATIVE", "NEUTRAL", "POSITIVE"]
indo_stopwords = {"yang","dan","atau","tidak","ini","itu","saya","kita","kami","dengan","untuk","akan"}
def detect_language(text: str) -> str:
words = set(re.findall(r"\w+", text.lower()))
return "ID" if words & indo_stopwords else "EN"
def predict_sentiment(model, tokenizer, text: str):
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
scores = torch.softmax(outputs.logits, dim=1).detach().numpy()[0]
idx = scores.argmax()
return LABELS[idx], float(scores[idx])
def get_topic(text: str) -> str:
text_lower = text.lower()
for topic, kws in topic_keywords.items():
if any(kw in text_lower for kw in kws):
return topic
return "GENERAL"
def get_news_sentiment(text: str):
if not isinstance(text, str) or not text.strip():
return "NEUTRAL", 0.0, "GENERAL"
lang = detect_language(text)
sentiment, score = predict_sentiment(indobert_model, indobert_tokenizer, text) if lang == "ID" else predict_sentiment(finbert_model, finbert_tokenizer, text)
if score < 0.6:
sentiment = "NEUTRAL"
text_lower = text.lower()
if any(kw in text_lower for kw in negative_keywords):
sentiment = "NEGATIVE"; score = max(score, 0.75)
elif any(kw in text_lower for kw in positive_keywords):
sentiment = "POSITIVE"; score = max(score, 0.75)
topic = get_topic(text)
return sentiment, score, topic
# ==============================
# API Endpoint
# ==============================
@app.get("/search")
async def search_news(query: str, start_date: str = None, end_date: str = None):
search_queries = [q.strip().lower() for q in query.split(',') if q.strip()]
all_raw_news = []
for sq in search_queries:
print(f"Scraping Indonesian news for '{sq}'...")
all_raw_news.extend(scrape_google_news(sq, indo=True, start_date=start_date, end_date=end_date))
time.sleep(random.uniform(1, 2))
print(f"Scraping English news for '{sq}'...")
all_raw_news.extend(scrape_google_news(sq, indo=False, start_date=start_date, end_date=end_date))
time.sleep(random.uniform(1.5, 3.5))
seen, unique_news = set(), []
for article in all_raw_news:
key = (article['title'], article['url'])
if key not in seen:
seen.add(key)
unique_news.append(article)
processed_articles = []
for article in unique_news:
text_to_analyze = f"{article['title']}. {article['summary']}"
sentiment, score, topic = get_news_sentiment(text_to_analyze)
article_data = article.copy()
article_data.update({
"sentiment": sentiment,
"sentiment_score": round(score, 4),
"topic": topic
})
processed_articles.append(article_data)
processed_articles.sort(key=lambda x: x['date'] if x['date'] != "Unknown" else "0000-00-00", reverse=True)
print(f"Found and processed {len(processed_articles)} unique articles for query: '{query}'")
return processed_articles