Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
import asyncio | |
import logging | |
from pathlib import Path | |
from typing import Set, Dict | |
import aiohttp | |
from bs4 import BeautifulSoup | |
from yarl import URL | |
import json | |
import re | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class AsyncCrawler: | |
def __init__(self, start_url: str, max_concurrent: int = 100): | |
self.start_url = URL(start_url) | |
self.base_domain = self.start_url.host | |
self.base_path = str(self.start_url).split(self.base_domain)[1] | |
self.visited_urls: Set[str] = set() | |
self.url_queue: asyncio.Queue = asyncio.Queue() | |
self.semaphore = asyncio.Semaphore(max_concurrent) | |
self.session: aiohttp.ClientSession = None | |
self.data_dir = Path("data/scraped") | |
self.sitemap: Dict[str, list] = {} | |
async def init_session(self): | |
"""Initialize aiohttp session with optimal settings.""" | |
timeout = aiohttp.ClientTimeout(total=10) | |
connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300) | |
self.session = aiohttp.ClientSession( | |
timeout=timeout, | |
connector=connector, | |
headers={"User-Agent": "ShopBot/1.0"} | |
) | |
def is_valid_url(self, url: URL) -> bool: | |
"""Check if URL should be crawled.""" | |
return ( | |
str(url).startswith(str(self.start_url)) | |
and url.scheme in ("http", "https") | |
and not url.fragment | |
) | |
async def process_page(self, url: str, html: str) -> Set[str]: | |
"""Extract links and save raw HTML.""" | |
# Regex pattern for Markdown links | |
pattern = r'\[.*?\]\((https?://[^\)]+|/[^)]+|[^\)]+)\)' | |
# Find all matches | |
markdown_links = re.findall(pattern, html) | |
soup = BeautifulSoup(html, 'html.parser') | |
anchor_links = [a['href'] for a in soup.find_all('a', href=True)] | |
links = markdown_links + anchor_links | |
absolute_links = [ | |
str(URL(link)) if URL(link).host else str(self.start_url.join(URL(link))) | |
for link in links | |
] | |
# concatenate the two sets | |
# Filter out invalid URLs | |
valid_links = { | |
link for link in absolute_links | |
if self.is_valid_url(URL | |
(link)) | |
} | |
# Save raw HTML | |
# extract just the path from the url | |
path = url.split(self.base_domain)[1] | |
raw_filepath = self.data_dir / 'raw' / path.replace("/", "_").replace("_docs_apps_build_", "") | |
raw_filepath.parent.mkdir(parents=True, exist_ok=True) | |
raw_filepath.write_text(html) | |
# raw_filepath.write_text(self.strip_all_html_tags_from_markdown(html)) | |
# Update sitemap | |
self.sitemap[url] = list(valid_links) | |
return valid_links | |
async def fetch_page(self, url: str) -> None: | |
"""Fetch and process a single page.""" | |
if url in self.visited_urls: | |
return | |
self.visited_urls.add(url) | |
try: | |
async with self.semaphore: | |
async with self.session.get(url) as response: | |
if response.status == 200: | |
html = await response.text() | |
new_urls = await self.process_page(url, html) | |
for new_url in new_urls: | |
if new_url not in self.visited_urls: | |
await self.url_queue.put(new_url) | |
logger.info(f"Successfully processed: {url}") | |
else: | |
logger.warning(f"Failed to fetch {url}: {response.status}") | |
except Exception as e: | |
logger.error(f"Error processing {url}: {str(e)}") | |
def strip_all_html_tags_from_markdown(self, markdown: str) -> str: | |
"""Remove all HTML tags from a string, except for opening and closing script tags.""" | |
# Define regex patterns to remove specific HTML tags | |
patterns = [ | |
r'<div class="react-code-block" data-preset="file">\n', | |
r'<div class="react-code-block" data-preset="basic">\n', | |
r'<div class="react-code-block" data-preset="terminal">\n', | |
r'<div class="react-code-block-preload ThemeMode-dim">\n', | |
r'<div class="react-code-block-preload-bar "></div>\n', | |
r'<div class="react-code-block-preload-bar basic-codeblock">', | |
r'<div class="react-code-block-preload-placeholder-container">\n', | |
r'<div class="react-code-block-preload-code-container">\n', | |
r'<div class="react-code-block-preload-codeline-number"></div>\n', | |
r'<div class="react-code-block-preload-codeline"></div>\n', | |
r'<script data-option=[^>]+ data-value=[^>]+></script>\n', | |
r'<div>\n', | |
r'</div>\n', | |
r'<br>\n', | |
r'<p>\n', | |
r'</p>\n', | |
# r'<(?!script\b)[^>]+>', | |
# r'</(?!script\b)[^>]+>', | |
r'END_RAW_MD_CONTENT', | |
r'RAW_MD_CONTENT', | |
] | |
# Remove all matched patterns from the markdown | |
for pattern in patterns: | |
markdown = re.sub(pattern, '', markdown) | |
markdown = re.sub(r'<script type="text/plain"[^>]+language="([^"]+)"[^>]*>', r'```\1', markdown) | |
markdown = re.sub(r'</script>', '```', markdown) | |
# replace 3 or more new lines with 2 new lines | |
markdown = re.sub(r'\n{3,}', '\n\n', markdown) | |
return markdown | |
def clean_raw_markdown(self): | |
"""Clean raw markdown files by stripping HTML tags.""" | |
raw_dir = self.data_dir / 'raw' | |
for raw_file in raw_dir.glob('*.txt'): | |
content = raw_file.read_text() | |
cleaned_content = self.strip_all_html_tags_from_markdown(content) | |
raw_filepath = self.data_dir / 'clean' / raw_file.name | |
raw_filepath.parent.mkdir(parents=True, exist_ok=True) | |
raw_filepath.write_text(cleaned_content) | |
async def run(self): | |
"""Main crawler execution.""" | |
# Create data directory | |
self.data_dir.mkdir(parents=True, exist_ok=True) | |
await self.init_session() | |
await self.url_queue.put(str(self.start_url)) | |
try: | |
workers = [] | |
while True: | |
if self.url_queue.empty() and not workers: | |
break | |
while not self.url_queue.empty(): | |
url = await self.url_queue.get() + '.txt' | |
if url not in self.visited_urls: | |
worker = asyncio.create_task(self.fetch_page(url)) | |
workers.append(worker) | |
if workers: | |
done, pending = await asyncio.wait( | |
workers, | |
return_when=asyncio.FIRST_COMPLETED | |
) | |
workers = list(pending) | |
for task in done: | |
await task | |
finally: | |
# Save sitemap | |
sitemap_path = self.data_dir / "_sitemap.json" | |
sitemap_path.write_text(json.dumps(self.sitemap, indent=2)) | |
self.clean_raw_markdown() | |
await self.session.close() | |
logger.info(f"Crawl completed. Processed {len(self.visited_urls)} pages.") | |
async def main(): | |
start_url = "https://shopify.dev/docs/apps/build/flow" | |
crawler = AsyncCrawler(start_url) | |
await crawler.run() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |