Spaces:
Configuration error
Configuration error
import re | |
import time | |
import json | |
import random | |
import hashlib | |
import logging | |
import requests | |
import pandas as pd | |
from pathlib import Path | |
from newspaper import Article, build | |
from datetime import datetime, timedelta | |
from urllib.parse import urljoin, urlparse | |
from typing import List, Dict, Optional, Tuple | |
from data.validation_schemas import ValidationLevel | |
from data.data_validator import DataValidationPipeline | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('/tmp/scraping.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class RobustNewsScraper: | |
"""Production-ready news scraper with comprehensive error handling and rate limiting""" | |
def __init__(self): | |
self.setup_paths() | |
self.setup_scraping_config() | |
self.session = self.create_session() | |
self.scraped_urls = self.load_scraped_urls() | |
def setup_paths(self): | |
"""Setup all necessary paths""" | |
self.base_dir = Path("/tmp") | |
self.data_dir = self.base_dir / "data" | |
self.data_dir.mkdir(parents=True, exist_ok=True) | |
self.output_path = self.data_dir / "scraped_real.csv" | |
self.metadata_path = self.data_dir / "scraping_metadata.json" | |
self.urls_cache_path = self.data_dir / "scraped_urls.json" | |
def setup_scraping_config(self): | |
"""Setup scraping configuration""" | |
self.news_sites = [ | |
{ | |
"name": "Reuters", | |
"url": "https://www.reuters.com/", | |
"max_articles": 8, | |
"delay": 2.0 | |
}, | |
{ | |
"name": "BBC", | |
"url": "https://www.bbc.com/news", | |
"max_articles": 7, | |
"delay": 2.5 | |
}, | |
{ | |
"name": "NPR", | |
"url": "https://www.npr.org/", | |
"max_articles": 5, | |
"delay": 3.0 | |
}, | |
{ | |
"name": "Associated Press", | |
"url": "https://apnews.com/", | |
"max_articles": 5, | |
"delay": 2.0 | |
} | |
] | |
self.max_articles_total = 20 | |
self.min_article_length = 100 | |
self.max_article_length = 10000 | |
self.scraping_timeout = 30 | |
self.max_retries = 3 | |
def create_session(self) -> requests.Session: | |
"""Create configured requests session""" | |
session = requests.Session() | |
session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
}) | |
return session | |
def load_scraped_urls(self) -> set: | |
"""Load previously scraped URLs to avoid duplicates""" | |
if self.urls_cache_path.exists(): | |
try: | |
with open(self.urls_cache_path, 'r') as f: | |
urls_data = json.load(f) | |
# Only keep URLs from last 30 days | |
cutoff_date = datetime.now() - timedelta(days=30) | |
recent_urls = { | |
url for url, timestamp in urls_data.items() | |
if datetime.fromisoformat(timestamp) > cutoff_date | |
} | |
logger.info(f"Loaded {len(recent_urls)} recent URLs from cache") | |
return recent_urls | |
except Exception as e: | |
logger.warning(f"Failed to load URL cache: {e}") | |
return set() | |
def save_scraped_urls(self, new_urls: Dict[str, str]): | |
"""Save scraped URLs with timestamps""" | |
try: | |
# Load existing URLs | |
urls_data = {} | |
if self.urls_cache_path.exists(): | |
with open(self.urls_cache_path, 'r') as f: | |
urls_data = json.load(f) | |
# Add new URLs | |
urls_data.update(new_urls) | |
# Save updated cache | |
with open(self.urls_cache_path, 'w') as f: | |
json.dump(urls_data, f, indent=2) | |
logger.info(f"Saved {len(new_urls)} new URLs to cache") | |
except Exception as e: | |
logger.error(f"Failed to save URL cache: {e}") | |
def validate_article_quality(self, article: Article) -> Tuple[bool, str]: | |
"""Validate article quality with comprehensive checks""" | |
# Check if article has minimum content | |
if not article.text or len(article.text.strip()) < self.min_article_length: | |
return False, "Article too short" | |
if len(article.text) > self.max_article_length: | |
return False, "Article too long" | |
# Check if article has title | |
if not article.title or len(article.title.strip()) < 10: | |
return False, "Missing or inadequate title" | |
# Check for meaningful content | |
if not any(c.isalpha() for c in article.text): | |
return False, "No alphabetic content" | |
# Check for sentence structure | |
if not any(punct in article.text for punct in '.!?'): | |
return False, "No sentence structure" | |
# Check for excessive HTML artifacts | |
html_patterns = [ | |
r'<[^>]+>', | |
r'&[a-zA-Z]+;', | |
r'javascript:', | |
r'document\.', | |
r'window\.' | |
] | |
for pattern in html_patterns: | |
if len(re.findall(pattern, article.text)) > 5: | |
return False, "Excessive HTML artifacts" | |
# Check for advertising content | |
ad_keywords = [ | |
'advertisement', 'sponsored', 'click here', 'buy now', | |
'subscribe', 'newsletter', 'cookies', 'privacy policy' | |
] | |
text_lower = article.text.lower() | |
ad_count = sum(1 for keyword in ad_keywords if keyword in text_lower) | |
if ad_count > 3: | |
return False, "Excessive advertising content" | |
return True, "Article passed validation" | |
def clean_article_text(self, text: str) -> str: | |
"""Clean and normalize article text""" | |
# Remove extra whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Remove HTML entities | |
text = re.sub(r'&[a-zA-Z]+;', '', text) | |
# Remove excessive punctuation | |
text = re.sub(r'[!]{2,}', '!', text) | |
text = re.sub(r'[?]{2,}', '?', text) | |
text = re.sub(r'[.]{3,}', '...', text) | |
# Remove non-printable characters | |
text = ''.join(char for char in text if ord(char) >= 32) | |
return text.strip() | |
def scrape_single_article(self, url: str) -> Optional[Dict]: | |
"""Scrape a single article with comprehensive error handling""" | |
try: | |
# Check if URL already scraped | |
if url in self.scraped_urls: | |
return None | |
# Create article object | |
article = Article(url) | |
# Download with timeout | |
article.download() | |
# Parse article | |
article.parse() | |
# Validate article quality | |
is_valid, reason = self.validate_article_quality(article) | |
if not is_valid: | |
logger.debug(f"Article validation failed ({reason}): {url}") | |
return None | |
# Clean article text | |
clean_title = self.clean_article_text(article.title) | |
clean_text = self.clean_article_text(article.text) | |
# Combine title and text | |
full_text = f"{clean_title}. {clean_text}" | |
# Create article data | |
article_data = { | |
'text': full_text, | |
'label': 0, # Real news | |
'source': urlparse(url).netloc, | |
'url': url, | |
'title': clean_title, | |
'timestamp': datetime.now().isoformat(), | |
'word_count': len(full_text.split()), | |
'char_count': len(full_text) | |
} | |
logger.info(f"Successfully scraped article: {clean_title[:50]}...") | |
return article_data | |
except Exception as e: | |
logger.warning(f"Failed to scrape {url}: {str(e)}") | |
return None | |
def scrape_site_articles(self, site_config: Dict) -> List[Dict]: | |
"""Scrape articles from a single news site""" | |
logger.info(f"Starting scraping from {site_config['name']}...") | |
articles = [] | |
scraped_urls = {} | |
try: | |
# Build newspaper object | |
paper = build(site_config['url'], memoize_articles=False) | |
# Get article URLs | |
article_urls = [article.url for article in paper.articles] | |
# Filter out already scraped URLs | |
new_urls = [url for url in article_urls if url not in self.scraped_urls] | |
# Shuffle URLs for randomness | |
random.shuffle(new_urls) | |
# Limit number of articles | |
urls_to_scrape = new_urls[:site_config['max_articles']] | |
logger.info(f"Found {len(urls_to_scrape)} new articles to scrape from {site_config['name']}") | |
# Scrape articles with rate limiting | |
for i, url in enumerate(urls_to_scrape): | |
if len(articles) >= site_config['max_articles']: | |
break | |
article_data = self.scrape_single_article(url) | |
if article_data: | |
articles.append(article_data) | |
scraped_urls[url] = datetime.now().isoformat() | |
# Rate limiting | |
if i < len(urls_to_scrape) - 1: | |
time.sleep(site_config['delay']) | |
# Save scraped URLs | |
if scraped_urls: | |
self.save_scraped_urls(scraped_urls) | |
logger.info(f"Successfully scraped {len(articles)} articles from {site_config['name']}") | |
except Exception as e: | |
logger.error(f"Error scraping {site_config['name']}: {str(e)}") | |
return articles | |
def scrape_all_sources(self) -> List[Dict]: | |
"""Scrape articles from all configured sources""" | |
logger.info("Starting comprehensive news scraping...") | |
all_articles = [] | |
# Scrape from each source | |
for site_config in self.news_sites: | |
if len(all_articles) >= self.max_articles_total: | |
break | |
try: | |
site_articles = self.scrape_site_articles(site_config) | |
all_articles.extend(site_articles) | |
# Delay between sites | |
if site_config != self.news_sites[-1]: | |
time.sleep(1.0) | |
except Exception as e: | |
logger.error(f"Error scraping {site_config['name']}: {str(e)}") | |
continue | |
# Limit total articles | |
all_articles = all_articles[:self.max_articles_total] | |
logger.info(f"Scraping complete. Total articles: {len(all_articles)}") | |
return all_articles | |
def save_scraped_articles(self, articles: List[Dict]) -> bool: | |
"""Save scraped articles with validation""" | |
try: | |
if not articles: | |
return True | |
# Validate articles first | |
valid_articles, validation_summary = self.validate_scraped_articles(articles) | |
logger.info(f"Validation: {len(valid_articles)}/{len(articles)} articles passed validation") | |
if not valid_articles: | |
logger.warning("No valid articles to save after validation") | |
return True | |
# Create DataFrame and save | |
df_new = pd.DataFrame(valid_articles) | |
# Existing file handling logic... | |
if self.output_path.exists(): | |
df_existing = pd.read_csv(self.output_path) | |
df_combined = pd.concat([df_existing, df_new], ignore_index=True) | |
df_combined = df_combined.drop_duplicates(subset=['text'], keep='first') | |
else: | |
df_combined = df_new | |
df_combined.to_csv(self.output_path, index=False) | |
# Save validation report | |
validation_report_path = self.data_dir / "scraping_validation_report.json" | |
with open(validation_report_path, 'w') as f: | |
json.dump(validation_summary, f, indent=2) | |
logger.info(f"Saved {len(valid_articles)} validated articles to {self.output_path}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to save validated articles: {e}") | |
return False | |
def generate_scraping_metadata(self, articles: List[Dict]) -> Dict: | |
"""Generate metadata about the scraping session""" | |
if not articles: | |
return {} | |
df = pd.DataFrame(articles) | |
metadata = { | |
'scraping_timestamp': datetime.now().isoformat(), | |
'articles_scraped': len(articles), | |
'sources': df['source'].value_counts().to_dict(), | |
'average_word_count': float(df['word_count'].mean()), | |
'total_characters': int(df['char_count'].sum()), | |
'scraping_duration': None, # Will be set by caller | |
'quality_score': self.calculate_scraping_quality(df) | |
} | |
return metadata | |
def calculate_scraping_quality(self, df: pd.DataFrame) -> float: | |
"""Calculate quality score for scraped articles""" | |
scores = [] | |
# Diversity score (different sources) | |
source_diversity = df['source'].nunique() / len(self.news_sites) | |
scores.append(source_diversity) | |
# Length consistency score | |
word_counts = df['word_count'] | |
length_score = 1.0 - (word_counts.std() / word_counts.mean()) | |
scores.append(max(0, min(1, length_score))) | |
# Freshness score (all articles should be recent) | |
freshness_score = 1.0 # All articles are fresh by definition | |
scores.append(freshness_score) | |
return float(sum(scores) / len(scores)) | |
def scrape_articles(self) -> Tuple[bool, str]: | |
"""Main scraping function with comprehensive error handling""" | |
start_time = time.time() | |
try: | |
logger.info("Starting news scraping process...") | |
# Scrape articles from all sources | |
articles = self.scrape_all_sources() | |
if not articles: | |
logger.warning("No articles were scraped successfully") | |
return False, "No articles scraped" | |
# Save articles | |
if not self.save_scraped_articles(articles): | |
return False, "Failed to save articles" | |
# Generate and save metadata | |
metadata = self.generate_scraping_metadata(articles) | |
metadata['scraping_duration'] = time.time() - start_time | |
try: | |
with open(self.metadata_path, 'w') as f: | |
json.dump(metadata, f, indent=2) | |
except Exception as e: | |
logger.warning(f"Failed to save metadata: {e}") | |
success_msg = f"Successfully scraped {len(articles)} articles" | |
logger.info(success_msg) | |
return True, success_msg | |
except Exception as e: | |
error_msg = f"Scraping process failed: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg | |
def validate_scraped_articles(self, articles: List[Dict]) -> Tuple[List[Dict], Dict]: | |
"""Validate scraped articles using validation schemas""" | |
if not articles: | |
return articles, {} | |
validator = DataValidationPipeline() | |
# Ensure required fields for validation | |
enhanced_articles = [] | |
for article in articles: | |
enhanced_article = article.copy() | |
if 'source' not in enhanced_article: | |
enhanced_article['source'] = 'scraped_real' | |
if 'label' not in enhanced_article: | |
enhanced_article['label'] = 0 # Real news | |
enhanced_articles.append(enhanced_article) | |
# Validate batch | |
validation_result = validator.validate_scraped_data(enhanced_articles, "web_scraping") | |
# Filter valid articles | |
valid_articles = [] | |
for i, result in enumerate(validation_result.validation_results): | |
if result.is_valid: | |
article = enhanced_articles[i].copy() | |
article['validation_quality_score'] = result.quality_metrics.get('overall_quality_score', 0.0) | |
valid_articles.append(article) | |
validation_summary = { | |
'original_count': len(articles), | |
'valid_count': len(valid_articles), | |
'success_rate': validation_result.success_rate, | |
'overall_quality_score': validation_result.overall_quality_score | |
} | |
return valid_articles, validation_summary | |
def scrape_articles(): | |
"""Main function for external calls""" | |
scraper = RobustNewsScraper() | |
success, message = scraper.scrape_articles() | |
if success: | |
print(f"β {message}") | |
else: | |
print(f"β {message}") | |
return success | |
def main(): | |
"""Main execution function""" | |
scraper = RobustNewsScraper() | |
success, message = scraper.scrape_articles() | |
if success: | |
print(f"β {message}") | |
else: | |
print(f"β {message}") | |
exit(1) | |
if __name__ == "__main__": | |
main() |