|
import os, time |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
from pathlib import Path |
|
|
|
from .models import UrlModel, CrawlResult |
|
from .database import init_db, get_cached_url, cache_url, DB_PATH, flush_db |
|
from .utils import * |
|
from .chunking_strategy import * |
|
from .extraction_strategy import * |
|
from .crawler_strategy import * |
|
from typing import List |
|
from concurrent.futures import ThreadPoolExecutor |
|
from .content_scraping_strategy import WebScrapingStrategy |
|
from .config import * |
|
import warnings |
|
import json |
|
warnings.filterwarnings("ignore", message='Field "model_name" has conflict with protected namespace "model_".') |
|
|
|
|
|
class WebCrawler: |
|
def __init__(self, crawler_strategy: CrawlerStrategy = None, always_by_pass_cache: bool = False, verbose: bool = False): |
|
self.crawler_strategy = crawler_strategy or LocalSeleniumCrawlerStrategy(verbose=verbose) |
|
self.always_by_pass_cache = always_by_pass_cache |
|
self.crawl4ai_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home()), ".crawl4ai") |
|
os.makedirs(self.crawl4ai_folder, exist_ok=True) |
|
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) |
|
init_db() |
|
self.ready = False |
|
|
|
def warmup(self): |
|
print("[LOG] π€οΈ Warming up the WebCrawler") |
|
self.run( |
|
url='https://google.com/', |
|
word_count_threshold=5, |
|
extraction_strategy=NoExtractionStrategy(), |
|
bypass_cache=False, |
|
verbose=False |
|
) |
|
self.ready = True |
|
print("[LOG] π WebCrawler is ready to crawl") |
|
|
|
def fetch_page( |
|
self, |
|
url_model: UrlModel, |
|
provider: str = DEFAULT_PROVIDER, |
|
api_token: str = None, |
|
extract_blocks_flag: bool = True, |
|
word_count_threshold=MIN_WORD_THRESHOLD, |
|
css_selector: str = None, |
|
screenshot: bool = False, |
|
use_cached_html: bool = False, |
|
extraction_strategy: ExtractionStrategy = None, |
|
chunking_strategy: ChunkingStrategy = RegexChunking(), |
|
**kwargs, |
|
) -> CrawlResult: |
|
return self.run( |
|
url_model.url, |
|
word_count_threshold, |
|
extraction_strategy or NoExtractionStrategy(), |
|
chunking_strategy, |
|
bypass_cache=url_model.forced, |
|
css_selector=css_selector, |
|
screenshot=screenshot, |
|
**kwargs, |
|
) |
|
pass |
|
|
|
def fetch_pages( |
|
self, |
|
url_models: List[UrlModel], |
|
provider: str = DEFAULT_PROVIDER, |
|
api_token: str = None, |
|
extract_blocks_flag: bool = True, |
|
word_count_threshold=MIN_WORD_THRESHOLD, |
|
use_cached_html: bool = False, |
|
css_selector: str = None, |
|
screenshot: bool = False, |
|
extraction_strategy: ExtractionStrategy = None, |
|
chunking_strategy: ChunkingStrategy = RegexChunking(), |
|
**kwargs, |
|
) -> List[CrawlResult]: |
|
extraction_strategy = extraction_strategy or NoExtractionStrategy() |
|
def fetch_page_wrapper(url_model, *args, **kwargs): |
|
return self.fetch_page(url_model, *args, **kwargs) |
|
|
|
with ThreadPoolExecutor() as executor: |
|
results = list( |
|
executor.map( |
|
fetch_page_wrapper, |
|
url_models, |
|
[provider] * len(url_models), |
|
[api_token] * len(url_models), |
|
[extract_blocks_flag] * len(url_models), |
|
[word_count_threshold] * len(url_models), |
|
[css_selector] * len(url_models), |
|
[screenshot] * len(url_models), |
|
[use_cached_html] * len(url_models), |
|
[extraction_strategy] * len(url_models), |
|
[chunking_strategy] * len(url_models), |
|
*[kwargs] * len(url_models), |
|
) |
|
) |
|
|
|
return results |
|
|
|
def run( |
|
self, |
|
url: str, |
|
word_count_threshold=MIN_WORD_THRESHOLD, |
|
extraction_strategy: ExtractionStrategy = None, |
|
chunking_strategy: ChunkingStrategy = RegexChunking(), |
|
bypass_cache: bool = False, |
|
css_selector: str = None, |
|
screenshot: bool = False, |
|
user_agent: str = None, |
|
verbose=True, |
|
**kwargs, |
|
) -> CrawlResult: |
|
try: |
|
extraction_strategy = extraction_strategy or NoExtractionStrategy() |
|
extraction_strategy.verbose = verbose |
|
if not isinstance(extraction_strategy, ExtractionStrategy): |
|
raise ValueError("Unsupported extraction strategy") |
|
if not isinstance(chunking_strategy, ChunkingStrategy): |
|
raise ValueError("Unsupported chunking strategy") |
|
|
|
word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) |
|
|
|
cached = None |
|
screenshot_data = None |
|
extracted_content = None |
|
if not bypass_cache and not self.always_by_pass_cache: |
|
cached = get_cached_url(url) |
|
|
|
if kwargs.get("warmup", True) and not self.ready: |
|
return None |
|
|
|
if cached: |
|
html = sanitize_input_encode(cached[1]) |
|
extracted_content = sanitize_input_encode(cached[4]) |
|
if screenshot: |
|
screenshot_data = cached[9] |
|
if not screenshot_data: |
|
cached = None |
|
|
|
if not cached or not html: |
|
if user_agent: |
|
self.crawler_strategy.update_user_agent(user_agent) |
|
t1 = time.time() |
|
html = sanitize_input_encode(self.crawler_strategy.crawl(url, **kwargs)) |
|
t2 = time.time() |
|
if verbose: |
|
print(f"[LOG] π Crawling done for {url}, success: {bool(html)}, time taken: {t2 - t1:.2f} seconds") |
|
if screenshot: |
|
screenshot_data = self.crawler_strategy.take_screenshot() |
|
|
|
|
|
crawl_result = self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot_data, verbose, bool(cached), **kwargs) |
|
crawl_result.success = bool(html) |
|
return crawl_result |
|
except Exception as e: |
|
if not hasattr(e, "msg"): |
|
e.msg = str(e) |
|
print(f"[ERROR] π« Failed to crawl {url}, error: {e.msg}") |
|
return CrawlResult(url=url, html="", success=False, error_message=e.msg) |
|
|
|
def process_html( |
|
self, |
|
url: str, |
|
html: str, |
|
extracted_content: str, |
|
word_count_threshold: int, |
|
extraction_strategy: ExtractionStrategy, |
|
chunking_strategy: ChunkingStrategy, |
|
css_selector: str, |
|
screenshot: bool, |
|
verbose: bool, |
|
is_cached: bool, |
|
**kwargs, |
|
) -> CrawlResult: |
|
t = time.time() |
|
|
|
try: |
|
t1 = time.time() |
|
scrapping_strategy = WebScrapingStrategy() |
|
extra_params = {k: v for k, v in kwargs.items() if k not in ["only_text", "image_description_min_word_threshold"]} |
|
result = scrapping_strategy.scrap( |
|
url, |
|
html, |
|
word_count_threshold=word_count_threshold, |
|
css_selector=css_selector, |
|
only_text=kwargs.get("only_text", False), |
|
image_description_min_word_threshold=kwargs.get( |
|
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD |
|
), |
|
**extra_params, |
|
) |
|
|
|
|
|
if verbose: |
|
print(f"[LOG] π Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds") |
|
|
|
if result is None: |
|
raise ValueError(f"Failed to extract content from the website: {url}") |
|
except InvalidCSSSelectorError as e: |
|
raise ValueError(str(e)) |
|
|
|
cleaned_html = sanitize_input_encode(result.get("cleaned_html", "")) |
|
markdown = sanitize_input_encode(result.get("markdown", "")) |
|
media = result.get("media", []) |
|
links = result.get("links", []) |
|
metadata = result.get("metadata", {}) |
|
|
|
if extracted_content is None: |
|
if verbose: |
|
print(f"[LOG] π₯ Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") |
|
|
|
sections = chunking_strategy.chunk(markdown) |
|
extracted_content = extraction_strategy.run(url, sections) |
|
extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False) |
|
|
|
if verbose: |
|
print(f"[LOG] π Extraction done for {url}, time taken: {time.time() - t:.2f} seconds.") |
|
|
|
screenshot = None if not screenshot else screenshot |
|
|
|
if not is_cached: |
|
cache_url( |
|
url, |
|
html, |
|
cleaned_html, |
|
markdown, |
|
extracted_content, |
|
True, |
|
json.dumps(media), |
|
json.dumps(links), |
|
json.dumps(metadata), |
|
screenshot=screenshot, |
|
) |
|
|
|
return CrawlResult( |
|
url=url, |
|
html=html, |
|
cleaned_html=format_html(cleaned_html), |
|
markdown=markdown, |
|
media=media, |
|
links=links, |
|
metadata=metadata, |
|
screenshot=screenshot, |
|
extracted_content=extracted_content, |
|
success=True, |
|
error_message="", |
|
) |