Spaces:
Runtime error
Runtime error
| """ | |
| Browser automation module for web scraping and analysis. | |
| This module enables the AI assistant to control a web browser, | |
| scrape content, and extract information from websites. | |
| """ | |
| import json | |
| import logging | |
| import re | |
| import urllib.parse | |
| from datetime import datetime | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from models import WebResource, Task, db | |
| logger = logging.getLogger(__name__) | |
| class BrowserAutomation: | |
| """Class for handling browser automation and web scraping""" | |
| def __init__(self, user_agent=None, headers=None): | |
| self.user_agent = user_agent or 'QuantumAI Assistant/1.0' | |
| self.headers = headers or { | |
| 'User-Agent': self.user_agent, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| } | |
| self.session = requests.Session() | |
| self.session.headers.update(self.headers) | |
| def fetch_page(self, url, task_id=None): | |
| """ | |
| Fetch a webpage and parse its content | |
| Args: | |
| url (str): The URL to fetch | |
| task_id (int, optional): Associated task ID | |
| Returns: | |
| dict: Result containing status, parsed content, and metadata | |
| """ | |
| try: | |
| # Parse and normalize URL | |
| parsed_url = urllib.parse.urlparse(url) | |
| if not parsed_url.scheme: | |
| url = 'https://' + url | |
| logger.info(f"Fetching URL: {url}") | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| # Parse with BeautifulSoup | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| title = soup.title.string if soup.title else "No title found" | |
| # Store or update the web resource | |
| web_resource = self._store_web_resource(url, title, task_id) | |
| # Extract main content, remove scripts, styles, etc. | |
| for script in soup(["script", "style", "meta", "noscript"]): | |
| script.extract() | |
| # Get text content and normalize whitespace | |
| text_content = soup.get_text(separator=' ') | |
| text_content = re.sub(r'\s+', ' ', text_content).strip() | |
| return { | |
| 'status': 'success', | |
| 'url': url, | |
| 'title': title, | |
| 'content': text_content, | |
| 'html': response.text, | |
| 'web_resource_id': web_resource.id, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching URL {url}: {str(e)}") | |
| return { | |
| 'status': 'error', | |
| 'url': url, | |
| 'error': str(e), | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| def _store_web_resource(self, url, title, task_id=None): | |
| """Store or update web resource in the database""" | |
| try: | |
| web_resource = WebResource.query.filter_by(url=url).first() | |
| if not web_resource: | |
| web_resource = WebResource( | |
| url=url, | |
| title=title, | |
| category='general', | |
| last_accessed=datetime.utcnow(), | |
| ) | |
| if task_id: | |
| web_resource.task_id = task_id | |
| db.session.add(web_resource) | |
| else: | |
| web_resource.last_accessed = datetime.utcnow() | |
| web_resource.title = title | |
| db.session.commit() | |
| return web_resource | |
| except Exception as e: | |
| logger.error(f"Error storing web resource: {str(e)}") | |
| db.session.rollback() | |
| # Return a placeholder object if db operation fails | |
| return WebResource(url=url, title=title) | |
| def extract_links(self, html): | |
| """Extract all links from an HTML document""" | |
| soup = BeautifulSoup(html, 'html.parser') | |
| links = [] | |
| for a_tag in soup.find_all('a', href=True): | |
| href = a_tag['href'] | |
| text = a_tag.get_text(strip=True) | |
| if href.startswith('#') or href.startswith('javascript:'): | |
| continue | |
| links.append({ | |
| 'href': href, | |
| 'text': text[:100] if text else "" | |
| }) | |
| return links | |
| def extract_structured_data(self, html): | |
| """Extract structured data (JSON-LD, microdata) from an HTML document""" | |
| soup = BeautifulSoup(html, 'html.parser') | |
| structured_data = [] | |
| # Extract JSON-LD | |
| for script in soup.find_all('script', type='application/ld+json'): | |
| try: | |
| data = json.loads(script.string) | |
| structured_data.append({ | |
| 'type': 'json-ld', | |
| 'data': data | |
| }) | |
| except json.JSONDecodeError: | |
| pass | |
| # TODO: Add microdata and RDFa extraction if needed | |
| return structured_data | |
| def analyze_page_content(self, content, url=None): | |
| """Analyze page content to extract key information using NLP""" | |
| # This will be enhanced with our quantum NLP process | |
| # For now, just return a simple analysis | |
| word_count = len(content.split()) | |
| sentences = re.split(r'[.!?]+', content) | |
| sentence_count = len(sentences) | |
| return { | |
| 'word_count': word_count, | |
| 'sentence_count': sentence_count, | |
| 'average_sentence_length': word_count / max(1, sentence_count), | |
| 'url': url | |
| } | |
| # Helper functions for browser automation tasks | |
| def create_scraping_task(url, title, description=None, scheduled_for=None): | |
| """Create a new web scraping task""" | |
| task = Task( | |
| title=title, | |
| description=description or f"Scrape content from {url}", | |
| status='pending', | |
| task_type='web_scrape', | |
| scheduled_for=scheduled_for, | |
| config={'url': url} | |
| ) | |
| db.session.add(task) | |
| db.session.commit() | |
| return task | |
| def execute_scraping_task(task_id): | |
| """Execute a web scraping task""" | |
| task = Task.query.get(task_id) | |
| if not task or task.task_type != 'web_scrape': | |
| return {'status': 'error', 'message': 'Invalid task'} | |
| try: | |
| task.status = 'in_progress' | |
| db.session.commit() | |
| url = task.config.get('url') | |
| browser = BrowserAutomation() | |
| result = browser.fetch_page(url, task_id=task.id) | |
| if result['status'] == 'success': | |
| # Also analyze the content | |
| analysis = browser.analyze_page_content(result['content'], url) | |
| result['analysis'] = analysis | |
| task.status = 'completed' | |
| task.completed_at = datetime.utcnow() | |
| task.result = result | |
| else: | |
| task.status = 'failed' | |
| task.result = result | |
| db.session.commit() | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error executing task {task_id}: {str(e)}") | |
| task.status = 'failed' | |
| task.result = {'error': str(e)} | |
| db.session.commit() | |
| return {'status': 'error', 'message': str(e)} |