|
"""
|
|
News API Collector
|
|
Collects industry news and trends for feature validation
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
from newsapi import NewsApiClient
|
|
import aiohttp
|
|
from textblob import TextBlob
|
|
import re
|
|
from dotenv import load_dotenv
|
|
from utils.logger import get_logger
|
|
from utils.nlp_query_enhancer import NLPQueryEnhancer
|
|
|
|
|
|
load_dotenv()
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class NewsCollector:
|
|
"""Collects and analyzes news for market trends"""
|
|
|
|
def __init__(self):
|
|
"""Initialize News API client"""
|
|
self.news_api = None
|
|
self.session = None
|
|
self.nlp_enhancer = NLPQueryEnhancer()
|
|
self.setup_news_client()
|
|
|
|
def setup_news_client(self):
|
|
"""Setup News API client with credentials"""
|
|
logger.log_processing_step("NewsAPI client setup", "started")
|
|
|
|
try:
|
|
|
|
api_key = os.getenv("NEWS_API_KEY") or os.getenv("NEWSAPI_KEY")
|
|
|
|
if not api_key:
|
|
error_msg = "No NewsAPI key found in environment variables (NEWS_API_KEY or NEWSAPI_KEY)"
|
|
logger.log_api_failure("NewsAPI", error_msg)
|
|
self.news_api = None
|
|
return
|
|
|
|
logger.log_api_attempt("NewsAPI", f"with key ending in ...{api_key[-4:]}")
|
|
self.news_api = NewsApiClient(api_key=api_key)
|
|
logger.log_api_success("NewsAPI", "client initialized")
|
|
logger.log_processing_step("NewsAPI client setup", "completed")
|
|
|
|
except Exception as e:
|
|
error_msg = f"NewsAPI client setup failed: {str(e)}"
|
|
logger.log_api_failure("NewsAPI", error_msg)
|
|
logger.log_processing_step("NewsAPI client setup", "failed")
|
|
self.news_api = None
|
|
|
|
async def __aenter__(self):
|
|
"""Async context manager entry"""
|
|
connector = aiohttp.TCPConnector(
|
|
limit=10,
|
|
limit_per_host=5,
|
|
ttl_dns_cache=300,
|
|
use_dns_cache=True,
|
|
keepalive_timeout=30,
|
|
enable_cleanup_closed=True
|
|
)
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
|
|
self.session = aiohttp.ClientSession(
|
|
connector=connector,
|
|
timeout=timeout
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Async context manager exit with proper cleanup"""
|
|
await self.cleanup()
|
|
|
|
async def cleanup(self):
|
|
"""Clean up session and connections properly"""
|
|
try:
|
|
if self.session and not self.session.closed:
|
|
await self.session.close()
|
|
logger.debug("News collector session closed properly")
|
|
except Exception as e:
|
|
logger.debug(f"Error during news collector cleanup: {str(e)}")
|
|
finally:
|
|
self.session = None
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
def clean_text(self, text: str) -> str:
|
|
"""Clean and preprocess text for analysis"""
|
|
if not text:
|
|
return ""
|
|
|
|
|
|
text = re.sub(r'http\\S+|www\\S+|https\\S+', '', text, flags=re.MULTILINE)
|
|
text = re.sub(r'[^\\w\\s]', ' ', text)
|
|
|
|
|
|
text = ' '.join(text.split())
|
|
|
|
return text.strip()
|
|
|
|
def extract_keywords(self, text: str, min_length: int = 4, max_keywords: int = 10) -> List[str]:
|
|
"""Extract important keywords from text"""
|
|
if not text:
|
|
return []
|
|
|
|
try:
|
|
|
|
blob = TextBlob(text)
|
|
|
|
|
|
keywords = [phrase for phrase in blob.noun_phrases if len(phrase) >= min_length]
|
|
|
|
|
|
return keywords[:max_keywords]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting keywords: {str(e)}")
|
|
return []
|
|
|
|
def optimize_search_query(self, feature_description: str) -> List[str]:
|
|
"""
|
|
Optimize search query by breaking down complex descriptions into searchable terms
|
|
|
|
Args:
|
|
feature_description: Original feature description
|
|
|
|
Returns:
|
|
List of optimized search queries
|
|
"""
|
|
queries = []
|
|
|
|
|
|
key_terms = []
|
|
|
|
|
|
tech_terms = ['ai', 'artificial intelligence', 'machine learning', 'ml', 'app', 'mobile', 'web', 'platform', 'software', 'tool', 'system']
|
|
|
|
|
|
business_terms = ['fitness', 'health', 'nutrition', 'training', 'workout', 'exercise', 'diet', 'wellness', 'personal trainer', 'coaching']
|
|
|
|
|
|
description_lower = feature_description.lower()
|
|
|
|
|
|
for term in tech_terms:
|
|
if term in description_lower:
|
|
key_terms.append(term)
|
|
|
|
|
|
for term in business_terms:
|
|
if term in description_lower:
|
|
key_terms.append(term)
|
|
|
|
|
|
if key_terms:
|
|
|
|
if 'ai' in key_terms or 'artificial intelligence' in key_terms:
|
|
if 'fitness' in key_terms:
|
|
queries.extend([
|
|
'AI fitness app',
|
|
'artificial intelligence fitness',
|
|
'AI personal trainer',
|
|
'smart fitness technology'
|
|
])
|
|
if 'nutrition' in key_terms:
|
|
queries.extend([
|
|
'AI nutrition app',
|
|
'artificial intelligence diet',
|
|
'smart meal planning'
|
|
])
|
|
|
|
|
|
if 'fitness' in key_terms:
|
|
queries.extend([
|
|
'fitness app market',
|
|
'digital fitness trends',
|
|
'fitness technology',
|
|
'workout app industry'
|
|
])
|
|
|
|
if 'nutrition' in key_terms:
|
|
queries.extend([
|
|
'nutrition app market',
|
|
'diet tracking technology',
|
|
'meal planning apps'
|
|
])
|
|
|
|
|
|
queries.extend([
|
|
'health tech startup',
|
|
'wellness app market',
|
|
'fitness industry trends',
|
|
'health technology news'
|
|
])
|
|
|
|
|
|
try:
|
|
blob = TextBlob(feature_description)
|
|
noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) <= 3]
|
|
|
|
|
|
for phrase in noun_phrases[:5]:
|
|
if len(phrase) > 3:
|
|
queries.append(phrase)
|
|
except:
|
|
pass
|
|
|
|
|
|
unique_queries = list(dict.fromkeys(queries))
|
|
|
|
|
|
return unique_queries[:8]
|
|
|
|
def create_search_variations(self, base_query: str) -> List[str]:
|
|
"""
|
|
Create variations of a search query for better coverage
|
|
|
|
Args:
|
|
base_query: Base search query
|
|
|
|
Returns:
|
|
List of query variations
|
|
"""
|
|
variations = [base_query]
|
|
|
|
|
|
variations.extend([
|
|
f"{base_query} industry",
|
|
f"{base_query} market",
|
|
f"{base_query} trends",
|
|
f"{base_query} startup",
|
|
f"{base_query} technology"
|
|
])
|
|
|
|
|
|
variations.extend([
|
|
f"{base_query} 2024",
|
|
f"{base_query} latest",
|
|
f"{base_query} new"
|
|
])
|
|
|
|
return variations
|
|
|
|
def _analyze_article_sentiment(self, text: str) -> Dict[str, Any]:
|
|
"""
|
|
Enhanced sentiment analysis specifically tuned for news articles
|
|
|
|
Args:
|
|
text: Article text (title + description)
|
|
|
|
Returns:
|
|
Detailed sentiment analysis with improved thresholds
|
|
"""
|
|
if not text:
|
|
return {
|
|
"polarity": 0.0,
|
|
"subjectivity": 0.0,
|
|
"classification": "neutral",
|
|
"confidence": 0.0,
|
|
"keyword_sentiment": 0.0,
|
|
"debug_info": {"error": "Empty text"}
|
|
}
|
|
|
|
try:
|
|
|
|
blob = TextBlob(text)
|
|
textblob_polarity = blob.sentiment.polarity
|
|
textblob_subjectivity = blob.sentiment.subjectivity
|
|
|
|
|
|
keyword_sentiment = self._analyze_keyword_sentiment(text.lower())
|
|
|
|
|
|
|
|
combined_polarity = (textblob_polarity * 0.6) + (keyword_sentiment * 0.4)
|
|
|
|
|
|
NEWS_POSITIVE_THRESHOLD = 0.05
|
|
NEWS_NEGATIVE_THRESHOLD = -0.05
|
|
|
|
|
|
if combined_polarity >= NEWS_POSITIVE_THRESHOLD:
|
|
classification = "positive"
|
|
elif combined_polarity <= NEWS_NEGATIVE_THRESHOLD:
|
|
classification = "negative"
|
|
else:
|
|
classification = "neutral"
|
|
|
|
|
|
confidence = min(abs(combined_polarity) * 2, 1.0)
|
|
|
|
return {
|
|
"polarity": combined_polarity,
|
|
"subjectivity": textblob_subjectivity,
|
|
"classification": classification,
|
|
"confidence": confidence,
|
|
"keyword_sentiment": keyword_sentiment,
|
|
"debug_info": {
|
|
"textblob_polarity": textblob_polarity,
|
|
"keyword_sentiment": keyword_sentiment,
|
|
"thresholds": {
|
|
"positive": NEWS_POSITIVE_THRESHOLD,
|
|
"negative": NEWS_NEGATIVE_THRESHOLD
|
|
}
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in enhanced sentiment analysis: {str(e)}")
|
|
return {
|
|
"polarity": 0.0,
|
|
"subjectivity": 0.0,
|
|
"classification": "neutral",
|
|
"confidence": 0.0,
|
|
"keyword_sentiment": 0.0,
|
|
"debug_info": {"error": str(e)}
|
|
}
|
|
|
|
def _analyze_keyword_sentiment(self, text: str) -> float:
|
|
"""
|
|
Analyze sentiment based on business/news-specific keywords
|
|
|
|
Args:
|
|
text: Lowercase text to analyze
|
|
|
|
Returns:
|
|
Sentiment score from keyword analysis (-1.0 to 1.0)
|
|
"""
|
|
|
|
negative_keywords = {
|
|
|
|
'bankruptcy': -0.9, 'fraud': -0.9, 'scandal': -0.8, 'crisis': -0.8,
|
|
'collapse': -0.8, 'failure': -0.7, 'lawsuit': -0.7, 'breach': -0.7,
|
|
'hack': -0.7, 'cyberattack': -0.7, 'data breach': -0.8,
|
|
|
|
|
|
'decline': -0.6, 'loss': -0.6, 'drop': -0.5, 'fall': -0.5,
|
|
'concern': -0.5, 'risk': -0.5, 'problem': -0.5, 'issue': -0.4,
|
|
'challenge': -0.4, 'difficulty': -0.4, 'struggle': -0.5,
|
|
'layoff': -0.6, 'shutdown': -0.7, 'closure': -0.6,
|
|
|
|
|
|
'recession': -0.7, 'downturn': -0.6, 'crash': -0.8,
|
|
'volatile': -0.5, 'uncertainty': -0.4, 'unstable': -0.5,
|
|
'disappointing': -0.5, 'missed expectations': -0.6,
|
|
'below forecast': -0.5, 'underperform': -0.5
|
|
}
|
|
|
|
|
|
positive_keywords = {
|
|
|
|
'breakthrough': 0.8, 'innovation': 0.7, 'revolutionary': 0.8,
|
|
'success': 0.7, 'achievement': 0.6, 'milestone': 0.6,
|
|
'record': 0.6, 'best': 0.5, 'excellent': 0.6,
|
|
|
|
|
|
'growth': 0.6, 'increase': 0.5, 'rise': 0.5, 'surge': 0.6,
|
|
'expansion': 0.6, 'profit': 0.5, 'revenue': 0.4, 'gain': 0.5,
|
|
'boost': 0.5, 'improve': 0.5, 'enhance': 0.4,
|
|
'opportunity': 0.4, 'potential': 0.4, 'promising': 0.5,
|
|
|
|
|
|
'bullish': 0.6, 'optimistic': 0.5, 'confident': 0.5,
|
|
'strong': 0.4, 'robust': 0.5, 'solid': 0.4,
|
|
'outperform': 0.5, 'exceed expectations': 0.6,
|
|
'beat forecast': 0.6, 'above estimates': 0.5
|
|
}
|
|
|
|
|
|
sentiment_score = 0.0
|
|
total_weight = 0.0
|
|
|
|
|
|
for keyword, weight in negative_keywords.items():
|
|
if keyword in text:
|
|
sentiment_score += weight
|
|
total_weight += abs(weight)
|
|
|
|
|
|
for keyword, weight in positive_keywords.items():
|
|
if keyword in text:
|
|
sentiment_score += weight
|
|
total_weight += abs(weight)
|
|
|
|
|
|
if total_weight > 0:
|
|
sentiment_score = sentiment_score / max(total_weight, 1.0)
|
|
|
|
|
|
return max(-1.0, min(1.0, sentiment_score))
|
|
|
|
async def search_news(self, query: str, days_back: int = 30, language: str = "en") -> Dict[str, Any]:
|
|
"""
|
|
Search for news articles related to a query with enhanced search strategies
|
|
|
|
Args:
|
|
query: Search query
|
|
days_back: Number of days to look back
|
|
language: Language code
|
|
|
|
Returns:
|
|
News search results or error details
|
|
"""
|
|
logger.log_processing_step(f"News search for '{query}'", "started")
|
|
|
|
if not self.news_api:
|
|
error_msg = "NewsAPI client not initialized - no valid API key available"
|
|
logger.log_api_failure("NewsAPI", error_msg)
|
|
logger.log_processing_step(f"News search for '{query}'", "failed")
|
|
return {
|
|
"query": query,
|
|
"error": error_msg,
|
|
"error_type": "no_api_client",
|
|
"total_results": 0,
|
|
"articles": [],
|
|
"searched_at": datetime.now().isoformat()
|
|
}
|
|
|
|
try:
|
|
logger.log_api_attempt("NewsAPI", f"searching for '{query}' ({days_back} days back)")
|
|
|
|
|
|
from_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
|
|
to_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
search_strategies = [
|
|
|
|
{"q": query, "sort_by": "relevancy"},
|
|
|
|
{"q": " OR ".join(query.split()) if len(query.split()) > 1 else query, "sort_by": "relevancy"},
|
|
|
|
{"q": f'"{query}"' if '"' not in query else query, "sort_by": "publishedAt"}
|
|
]
|
|
|
|
response = None
|
|
successful_strategy = None
|
|
|
|
for i, strategy in enumerate(search_strategies):
|
|
try:
|
|
response = self.news_api.get_everything(
|
|
from_param=from_date,
|
|
to=to_date,
|
|
language=language,
|
|
**strategy
|
|
)
|
|
|
|
|
|
if response and response.get("totalResults", 0) > 0:
|
|
successful_strategy = i + 1
|
|
logger.info(f"Search strategy {successful_strategy} successful for query: {strategy['q']}")
|
|
break
|
|
elif i == 0:
|
|
logger.info(f"Search strategy 1 returned 0 results, trying alternative strategies")
|
|
|
|
except Exception as strategy_error:
|
|
logger.warning(f"Search strategy {i + 1} failed: {str(strategy_error)}")
|
|
continue
|
|
|
|
|
|
if response is None:
|
|
response = {"articles": [], "totalResults": 0}
|
|
|
|
|
|
articles = response.get("articles", [])
|
|
total_results = response.get("totalResults", 0)
|
|
|
|
logger.log_api_success("NewsAPI", f"found {total_results} total results, processing {len(articles)} articles")
|
|
|
|
processed_results = {
|
|
"query": query,
|
|
"total_results": total_results,
|
|
"days_back": days_back,
|
|
"articles": [],
|
|
"sources": {},
|
|
"keywords": {},
|
|
"searched_at": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
for article in articles:
|
|
|
|
processed_article = {
|
|
"title": article.get("title", ""),
|
|
"source": article.get("source", {}).get("name", "Unknown"),
|
|
"author": article.get("author", ""),
|
|
"published_at": article.get("publishedAt", ""),
|
|
"description": article.get("description", ""),
|
|
"url": article.get("url", ""),
|
|
"content": article.get("content", "")
|
|
}
|
|
|
|
|
|
full_text = f"{processed_article['title']} {processed_article['description']}"
|
|
keywords = self.extract_keywords(full_text)
|
|
processed_article["keywords"] = keywords
|
|
|
|
|
|
sentiment_analysis = self._analyze_article_sentiment(full_text)
|
|
processed_article["sentiment"] = sentiment_analysis
|
|
|
|
|
|
processed_results["articles"].append(processed_article)
|
|
|
|
|
|
source = processed_article["source"]
|
|
if source not in processed_results["sources"]:
|
|
processed_results["sources"][source] = 0
|
|
processed_results["sources"][source] += 1
|
|
|
|
|
|
for keyword in keywords:
|
|
if keyword not in processed_results["keywords"]:
|
|
processed_results["keywords"][keyword] = 0
|
|
processed_results["keywords"][keyword] += 1
|
|
|
|
|
|
sentiment_debug = {"positive": 0, "negative": 0, "neutral": 0}
|
|
for article in processed_results["articles"]:
|
|
classification = article.get("sentiment", {}).get("classification", "neutral")
|
|
sentiment_debug[classification] += 1
|
|
|
|
logger.info(f"Sentiment distribution for '{query}': {sentiment_debug}")
|
|
logger.log_data_collection(f"NewsAPI articles for '{query}'", len(processed_results['articles']))
|
|
logger.log_processing_step(f"News search for '{query}'", "completed")
|
|
|
|
return processed_results
|
|
|
|
except Exception as e:
|
|
error_msg = f"NewsAPI search failed for '{query}': {str(e)}"
|
|
logger.log_api_failure("NewsAPI", error_msg)
|
|
logger.log_processing_step(f"News search for '{query}'", "failed")
|
|
|
|
|
|
return {
|
|
"query": query,
|
|
"error": error_msg,
|
|
"error_type": "api_request_failed",
|
|
"error_details": str(e),
|
|
"total_results": 0,
|
|
"articles": [],
|
|
"searched_at": datetime.now().isoformat()
|
|
}
|
|
|
|
async def get_trend_analysis(self, feature_description: str, target_market: str = "", days_back: int = 30) -> Dict[str, Any]:
|
|
"""
|
|
Analyze trends for a specific feature using enhanced NLP
|
|
|
|
Args:
|
|
feature_description: Description of the feature
|
|
target_market: Target market description
|
|
days_back: Number of days to look back
|
|
|
|
Returns:
|
|
Trend analysis
|
|
"""
|
|
try:
|
|
|
|
enhanced_data = await self.nlp_enhancer.enhance_query(feature_description, target_market)
|
|
optimized_queries = self.nlp_enhancer.get_optimized_queries_for_platform(enhanced_data, 'news')
|
|
|
|
|
|
if not optimized_queries:
|
|
|
|
words = feature_description.lower().split()
|
|
key_words = [word for word in words if len(word) > 3 and word not in ['and', 'the', 'for', 'with', 'that', 'this']]
|
|
|
|
|
|
if len(key_words) >= 2:
|
|
optimized_queries = [
|
|
' '.join(key_words[:2]),
|
|
' '.join(key_words[:3]) if len(key_words) >= 3 else ' '.join(key_words),
|
|
f"{key_words[0]} market" if key_words else "technology market",
|
|
f"{key_words[0]} industry" if key_words else "technology industry"
|
|
]
|
|
else:
|
|
optimized_queries = [feature_description]
|
|
|
|
logger.info(f"Using optimized search queries: {optimized_queries}")
|
|
search_queries = optimized_queries
|
|
|
|
|
|
query_results = {}
|
|
successful_queries = []
|
|
|
|
for query in search_queries:
|
|
result = await self.search_news(query, days_back)
|
|
query_results[query] = result
|
|
|
|
|
|
if result.get("total_results", 0) > 0:
|
|
successful_queries.append(query)
|
|
|
|
|
|
if not successful_queries:
|
|
logger.info("No results from optimized queries, trying broader fallback searches")
|
|
fallback_queries = [
|
|
"fitness app",
|
|
"health technology",
|
|
"mobile app market",
|
|
"wellness industry",
|
|
"AI technology",
|
|
"personal training",
|
|
"nutrition app"
|
|
]
|
|
|
|
for query in fallback_queries:
|
|
if query not in query_results:
|
|
result = await self.search_news(query, days_back)
|
|
query_results[query] = result
|
|
|
|
if result.get("total_results", 0) > 0:
|
|
successful_queries.append(query)
|
|
|
|
if len(successful_queries) >= 3:
|
|
break
|
|
|
|
|
|
trend_analysis = {
|
|
"feature_description": feature_description,
|
|
"original_queries": search_queries,
|
|
"successful_queries": successful_queries,
|
|
"total_queries_tried": len(query_results),
|
|
"aggregate_stats": self._aggregate_news_results(query_results),
|
|
"query_results": query_results,
|
|
"analyzed_at": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
trend_analysis["trends"] = self._identify_trends(trend_analysis)
|
|
|
|
return trend_analysis
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in trend analysis: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
async def get_industry_trends(self, industry: str, timeframe: str = "30d") -> Dict[str, Any]:
|
|
"""Get general industry trends"""
|
|
try:
|
|
|
|
days_mapping = {"7d": 7, "30d": 30, "90d": 90}
|
|
days_back = days_mapping.get(timeframe, 30)
|
|
|
|
|
|
result = await self.search_news(f"{industry} industry trends", days_back)
|
|
|
|
|
|
result["industry"] = industry
|
|
result["timeframe"] = timeframe
|
|
result["trend_score"] = self._calculate_trend_score(result)
|
|
|
|
|
|
result["top_trends"] = self._extract_top_trends(result, 5)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting industry trends: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
def _aggregate_news_results(self, query_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Aggregate results from multiple queries"""
|
|
aggregate = {
|
|
"total_articles": 0,
|
|
"total_sources": 0,
|
|
"sources": {},
|
|
"keywords": {},
|
|
"sentiment": {
|
|
"positive": 0,
|
|
"negative": 0,
|
|
"neutral": 0,
|
|
"average_polarity": 0
|
|
}
|
|
}
|
|
|
|
|
|
articles_processed = set()
|
|
total_polarity = 0
|
|
articles_with_sentiment = 0
|
|
|
|
for query, result in query_results.items():
|
|
if "articles" not in result:
|
|
continue
|
|
|
|
for article in result["articles"]:
|
|
|
|
url = article.get("url", "")
|
|
if url in articles_processed:
|
|
continue
|
|
|
|
articles_processed.add(url)
|
|
aggregate["total_articles"] += 1
|
|
|
|
|
|
source = article.get("source", "Unknown")
|
|
if source not in aggregate["sources"]:
|
|
aggregate["sources"][source] = 0
|
|
aggregate["total_sources"] += 1
|
|
aggregate["sources"][source] += 1
|
|
|
|
|
|
for keyword in article.get("keywords", []):
|
|
if keyword not in aggregate["keywords"]:
|
|
aggregate["keywords"][keyword] = 0
|
|
aggregate["keywords"][keyword] += 1
|
|
|
|
|
|
sentiment = article.get("sentiment", {})
|
|
classification = sentiment.get("classification", "neutral")
|
|
polarity = sentiment.get("polarity", 0)
|
|
|
|
|
|
if classification == "positive":
|
|
aggregate["sentiment"]["positive"] += 1
|
|
elif classification == "negative":
|
|
aggregate["sentiment"]["negative"] += 1
|
|
else:
|
|
aggregate["sentiment"]["neutral"] += 1
|
|
|
|
total_polarity += polarity
|
|
articles_with_sentiment += 1
|
|
|
|
|
|
if articles_with_sentiment > 0:
|
|
aggregate["sentiment"]["average_polarity"] = total_polarity / articles_with_sentiment
|
|
|
|
|
|
aggregate["top_keywords"] = sorted(
|
|
aggregate["keywords"].items(),
|
|
key=lambda x: x[1],
|
|
reverse=True
|
|
)[:10]
|
|
|
|
aggregate["top_sources"] = sorted(
|
|
aggregate["sources"].items(),
|
|
key=lambda x: x[1],
|
|
reverse=True
|
|
)[:5]
|
|
|
|
return aggregate
|
|
|
|
def _identify_trends(self, analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Identify trends from news analysis"""
|
|
trends = []
|
|
|
|
|
|
keywords = analysis.get("aggregate_stats", {}).get("keywords", {})
|
|
for keyword, count in sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10]:
|
|
|
|
if count >= 2:
|
|
trends.append({
|
|
"keyword": keyword,
|
|
"count": count,
|
|
"type": "topic"
|
|
})
|
|
|
|
|
|
|
|
|
|
return trends
|
|
|
|
def _calculate_trend_score(self, result: Dict[str, Any]) -> float:
|
|
"""Calculate trend score based on article volume and recency"""
|
|
if "articles" not in result or not result["articles"]:
|
|
return 0.0
|
|
|
|
|
|
recent_count = 0
|
|
total_count = len(result["articles"])
|
|
|
|
one_week_ago = datetime.now() - timedelta(days=7)
|
|
|
|
for article in result["articles"]:
|
|
try:
|
|
published_date = datetime.fromisoformat(article.get("published_at", "").replace("Z", "+00:00"))
|
|
if published_date >= one_week_ago:
|
|
recent_count += 1
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
recency_ratio = recent_count / total_count if total_count > 0 else 0
|
|
trend_score = min((total_count / 10) * (1 + recency_ratio), 10.0)
|
|
|
|
return round(trend_score, 2)
|
|
|
|
def _extract_top_trends(self, result: Dict[str, Any], limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Extract top trends from news result"""
|
|
trends = []
|
|
|
|
|
|
keywords = {}
|
|
for article in result.get("articles", []):
|
|
for keyword in article.get("keywords", []):
|
|
if keyword not in keywords:
|
|
keywords[keyword] = {
|
|
"keyword": keyword,
|
|
"count": 0,
|
|
"sentiment": 0,
|
|
"articles": []
|
|
}
|
|
|
|
keywords[keyword]["count"] += 1
|
|
keywords[keyword]["sentiment"] += article.get("sentiment", {}).get("polarity", 0)
|
|
keywords[keyword]["articles"].append(article.get("url", ""))
|
|
|
|
|
|
for keyword, data in keywords.items():
|
|
if data["count"] > 0:
|
|
data["sentiment"] = data["sentiment"] / data["count"]
|
|
data["articles"] = data["articles"][:3]
|
|
trends.append(data)
|
|
|
|
|
|
return sorted(trends, key=lambda x: x["count"], reverse=True)[:limit]
|
|
|
|
|
|
|
|
async def test_news_collector():
|
|
"""Test function for NewsCollector"""
|
|
collector = NewsCollector()
|
|
|
|
|
|
print("Testing trend analysis...")
|
|
result = await collector.get_trend_analysis(
|
|
"AI voice ordering",
|
|
days_back=15
|
|
)
|
|
print(f"Trend analysis: {result.get('aggregate_stats', {}).get('total_articles', 0)} articles analyzed")
|
|
|
|
|
|
print("Testing industry trends...")
|
|
industry_result = await collector.get_industry_trends("artificial intelligence", "30d")
|
|
print(f"Industry trends: {industry_result.get('trend_score', 0)} trend score")
|
|
|
|
return result, industry_result
|
|
|
|
if __name__ == "__main__":
|
|
|
|
asyncio.run(test_news_collector())
|
|
|