import time |
from urllib.parse import urlparse |
from concurrent.futures import ThreadPoolExecutor, as_completed |
from bs4 import BeautifulSoup, Comment, element, Tag, NavigableString |
import json |
import html |
import re |
import os |
import platform |
from .prompts import PROMPT_EXTRACT_BLOCKS |
from .config import * |
from pathlib import Path |
from typing import Dict, Any |
from urllib.parse import urljoin |
import requests |
from requests.exceptions import InvalidSchema |
from typing import Optional, Tuple, Dict, Any |
import xxhash |
from colorama import Fore, Style, init |
import textwrap |
import cProfile |
import pstats |
from functools import wraps |
import asyncio |
class InvalidCSSSelectorError(Exception): |
pass |
def create_box_message(message: str, type: str = "info", width: int = 120, add_newlines: bool = True, double_line: bool = False) -> str: |
""" |
Create a styled message box with colored borders and formatted text. |
How it works: |
1. Determines box style and colors based on the message type (e.g., info, warning). |
2. Wraps text to fit within the specified width. |
3. Constructs a box using characters (single or double lines) with appropriate formatting. |
4. Adds optional newlines before and after the box. |
Args: |
message (str): The message to display inside the box. |
type (str): Type of the message (e.g., "info", "warning", "error", "success"). Defaults to "info". |
width (int): Width of the box. Defaults to 120. |
add_newlines (bool): Whether to add newlines before and after the box. Defaults to True. |
double_line (bool): Whether to use double lines for the box border. Defaults to False. |
Returns: |
str: A formatted string containing the styled message box. |
""" |
init() |
styles = { |
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, "⚠"), |
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, "ℹ"), |
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, "✓"), |
"error": (Fore.RED, Fore.LIGHTRED_EX, "×"), |
} |
border_color, text_color, prefix = styles.get(type.lower(), styles["info"]) |
box_chars = { |
"single": ("─", "│", "┌", "┐", "└", "┘"), |
"double": ("═", "║", "╔", "╗", "╚", "╝") |
} |
line_style = "double" if double_line else "single" |
h_line, v_line, tl, tr, bl, br = box_chars[line_style] |
formatted_lines = [] |
raw_lines = message.split('\n') |
if raw_lines: |
first_line = f"{prefix} {raw_lines[0].strip()}" |
wrapped_first = textwrap.fill(first_line, width=width-4) |
formatted_lines.extend(wrapped_first.split('\n')) |
for line in raw_lines[1:]: |
if line.strip(): |
wrapped = textwrap.fill(f" {line.strip()}", width=width-4) |
formatted_lines.extend(wrapped.split('\n')) |
else: |
formatted_lines.append("") |
horizontal_line = h_line * (width - 1) |
box = [ |
f"{border_color}{tl}{horizontal_line}{tr}", |
*[f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" for line in formatted_lines], |
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}" |
] |
result = "\n".join(box) |
if add_newlines: |
result = f"\n{result}\n" |
return result |
def calculate_semaphore_count(): |
""" |
Calculate the optimal semaphore count based on system resources. |
How it works: |
1. Determines the number of CPU cores and total system memory. |
2. Sets a base count as half of the available CPU cores. |
3. Limits the count based on memory, assuming 2GB per semaphore instance. |
4. Returns the minimum value between CPU and memory-based limits. |
Returns: |
int: The calculated semaphore count. |
""" |
cpu_count = os.cpu_count() |
memory_gb = get_system_memory() / (1024 ** 3) |
base_count = max(1, cpu_count // 2) |
memory_based_cap = int(memory_gb / 2) |
return min(base_count, memory_based_cap) |
def get_system_memory(): |
""" |
Get the total system memory in bytes. |
How it works: |
1. Detects the operating system. |
2. Reads memory information from system-specific commands or files. |
3. Converts the memory to bytes for uniformity. |
Returns: |
int: The total system memory in bytes. |
Raises: |
OSError: If the operating system is unsupported. |
""" |
system = platform.system() |
if system == "Linux": |
with open('/proc/meminfo', 'r') as mem: |
for line in mem: |
if line.startswith('MemTotal:'): |
return int(line.split()[1]) * 1024 |
elif system == "Darwin": |
import subprocess |
output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8') |
return int(output.strip()) |
elif system == "Windows": |
import ctypes |
kernel32 = ctypes.windll.kernel32 |
c_ulonglong = ctypes.c_ulonglong |
class MEMORYSTATUSEX(ctypes.Structure): |
_fields_ = [ |
('dwLength', ctypes.c_ulong), |
('dwMemoryLoad', ctypes.c_ulong), |
('ullTotalPhys', c_ulonglong), |
('ullAvailPhys', c_ulonglong), |
('ullTotalPageFile', c_ulonglong), |
('ullAvailPageFile', c_ulonglong), |
('ullTotalVirtual', c_ulonglong), |
('ullAvailVirtual', c_ulonglong), |
('ullAvailExtendedVirtual', c_ulonglong), |
] |
memoryStatus = MEMORYSTATUSEX() |
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX) |
kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus)) |
return memoryStatus.ullTotalPhys |
else: |
raise OSError("Unsupported operating system") |
def get_home_folder(): |
""" |
Get or create the home folder for Crawl4AI configuration and cache. |
How it works: |
1. Uses environment variables or defaults to the user's home directory. |
2. Creates `.crawl4ai` and its subdirectories (`cache`, `models`) if they don't exist. |
3. Returns the path to the home folder. |
Returns: |
str: The path to the Crawl4AI home folder. |
""" |
home_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), ".crawl4ai") |
os.makedirs(home_folder, exist_ok=True) |
os.makedirs(f"{home_folder}/cache", exist_ok=True) |
os.makedirs(f"{home_folder}/models", exist_ok=True) |
return home_folder |
def beautify_html(escaped_html): |
""" |
Beautifies an escaped HTML string. |
Parameters: |
escaped_html (str): A string containing escaped HTML. |
Returns: |
str: A beautifully formatted HTML string. |
""" |
unescaped_html = html.unescape(escaped_html) |
soup = BeautifulSoup(unescaped_html, 'html.parser') |
pretty_html = soup.prettify() |
return pretty_html |
def split_and_parse_json_objects(json_string): |
""" |
Splits a JSON string which is a list of objects and tries to parse each object. |
Parameters: |
json_string (str): A string representation of a list of JSON objects, e.g., '[{...}, {...}, ...]'. |
Returns: |
tuple: A tuple containing two lists: |
- First list contains all successfully parsed JSON objects. |
- Second list contains the string representations of all segments that couldn't be parsed. |
""" |
if json_string.startswith('[') and json_string.endswith(']'): |
json_string = json_string[1:-1].strip() |
segments = [] |
depth = 0 |
start_index = 0 |
for i, char in enumerate(json_string): |
if char == '{': |
if depth == 0: |
start_index = i |
depth += 1 |
elif char == '}': |
depth -= 1 |
if depth == 0: |
segments.append(json_string[start_index:i+1]) |
parsed_objects = [] |
unparsed_segments = [] |
for segment in segments: |
try: |
obj = json.loads(segment) |
parsed_objects.append(obj) |
except json.JSONDecodeError: |
unparsed_segments.append(segment) |
return parsed_objects, unparsed_segments |
def sanitize_html(html): |
""" |
Sanitize an HTML string by escaping quotes. |
How it works: |
1. Replaces all unwanted and special characters with an empty string. |
2. Escapes double and single quotes for safe usage. |
Args: |
html (str): The HTML string to sanitize. |
Returns: |
str: The sanitized HTML string. |
""" |
sanitized_html = html |
sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'") |
return sanitized_html |
def sanitize_input_encode(text: str) -> str: |
"""Sanitize input to handle potential encoding issues.""" |
try: |
try: |
if not text: |
return '' |
return text.encode('utf-8', errors='ignore').decode('utf-8') |
except UnicodeEncodeError as e: |
print(f"Warning: Encoding issue detected. Some characters may be lost. Error: {e}") |
return text.encode('ascii', errors='ignore').decode('ascii') |
except Exception as e: |
raise ValueError(f"Error sanitizing input: {str(e)}") from e |
def escape_json_string(s): |
""" |
Escapes characters in a string to be JSON safe. |
Parameters: |
s (str): The input string to be escaped. |
Returns: |
str: The escaped string, safe for JSON encoding. |
""" |
s = s.replace('\\', '\\\\') |
s = s.replace('"', '\\"') |
s = s.replace('\b', '\\b') |
s = s.replace('\f', '\\f') |
s = s.replace('\n', '\\n') |
s = s.replace('\r', '\\r') |
s = s.replace('\t', '\\t') |
s = re.sub(r'[\x00-\x1f\x7f-\x9f]', lambda x: '\\u{:04x}'.format(ord(x.group())), s) |
return s |
def replace_inline_tags(soup, tags, only_text=False): |
""" |
Replace inline HTML tags with Markdown-style equivalents. |
How it works: |
1. Maps specific tags (e.g., <b>, <i>) to Markdown syntax. |
2. Finds and replaces all occurrences of these tags in the provided BeautifulSoup object. |
3. Optionally replaces tags with their text content only. |
Args: |
soup (BeautifulSoup): Parsed HTML content. |
tags (List[str]): List of tags to replace. |
only_text (bool): Whether to replace tags with plain text. Defaults to False. |
Returns: |
BeautifulSoup: Updated BeautifulSoup object with replaced tags. |
""" |
tag_replacements = { |
'b': lambda tag: f"**{tag.text}**", |
'i': lambda tag: f"*{tag.text}*", |
'u': lambda tag: f"__{tag.text}__", |
'span': lambda tag: f"{tag.text}", |
'del': lambda tag: f"~~{tag.text}~~", |
'ins': lambda tag: f"++{tag.text}++", |
'sub': lambda tag: f"~{tag.text}~", |
'sup': lambda tag: f"^^{tag.text}^^", |
'strong': lambda tag: f"**{tag.text}**", |
'em': lambda tag: f"*{tag.text}*", |
'code': lambda tag: f"`{tag.text}`", |
'kbd': lambda tag: f"`{tag.text}`", |
'var': lambda tag: f"_{tag.text}_", |
's': lambda tag: f"~~{tag.text}~~", |
'q': lambda tag: f'"{tag.text}"', |
'abbr': lambda tag: f"{tag.text} ({tag.get('title', '')})", |
'cite': lambda tag: f"_{tag.text}_", |
'dfn': lambda tag: f"_{tag.text}_", |
'time': lambda tag: f"{tag.text}", |
'small': lambda tag: f"<small>{tag.text}</small>", |
'mark': lambda tag: f"=={tag.text}==" |
} |
replacement_data = [(tag, tag_replacements.get(tag, lambda t: t.text)) for tag in tags] |
for tag_name, replacement_func in replacement_data: |
for tag in soup.find_all(tag_name): |
replacement_text = tag.text if only_text else replacement_func(tag) |
tag.replace_with(replacement_text) |
return soup |
def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None, **kwargs): |
""" |
Extract structured content, media, and links from website HTML. |
How it works: |
1. Parses the HTML content using BeautifulSoup. |
2. Extracts internal/external links and media (images, videos, audios). |
3. Cleans the content by removing unwanted tags and attributes. |
4. Converts cleaned HTML to Markdown. |
5. Collects metadata and returns the extracted information. |
Args: |
url (str): The website URL. |
html (str): The HTML content of the website. |
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. |
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. |
Returns: |
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. |
""" |
try: |
if not html: |
return None |
soup = BeautifulSoup(html, 'html.parser') |
body = soup.body |
if css_selector: |
selected_elements = body.select(css_selector) |
if not selected_elements: |
raise InvalidCSSSelectorError(f"Invalid CSS selector , No elements found for CSS selector: {css_selector}") |
div_tag = soup.new_tag('div') |
for el in selected_elements: |
div_tag.append(el) |
body = div_tag |
links = { |
'internal': [], |
'external': [] |
} |
for a in body.find_all('a', href=True): |
href = a['href'] |
url_base = url.split('/')[2] |
if href.startswith('http') and url_base not in href: |
links['external'].append({ |
'href': href, |
'text': a.get_text() |
}) |
else: |
links['internal'].append( |
{ |
'href': href, |
'text': a.get_text() |
} |
) |
for tag in body.find_all(['script', 'style', 'link', 'meta', 'noscript']): |
tag.decompose() |
for tag in body.find_all(): |
if tag.name != 'img': |
tag.attrs = {} |
media = { |
'images': [], |
'videos': [], |
'audios': [] |
} |
for img in body.find_all('img'): |
media['images'].append({ |
'src': img.get('src'), |
'alt': img.get('alt'), |
"type": "image" |
}) |
for video in body.find_all('video'): |
media['videos'].append({ |
'src': video.get('src'), |
'alt': video.get('alt'), |
"type": "video" |
}) |
for audio in body.find_all('audio'): |
media['audios'].append({ |
'src': audio.get('src'), |
'alt': audio.get('alt'), |
"type": "audio" |
}) |
for img in body.find_all('img'): |
alt_text = img.get('alt') |
if alt_text: |
img.replace_with(soup.new_string(alt_text)) |
else: |
img.decompose() |
def replace_pre_tags_with_text(node): |
for child in node.find_all('pre'): |
child.string = child.get_text() |
return node |
body = replace_pre_tags_with_text(body) |
body = replace_inline_tags( |
body, |
['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark'], |
only_text=kwargs.get('only_text', False) |
) |
def remove_empty_and_low_word_count_elements(node, word_count_threshold): |
for child in node.contents: |
if isinstance(child, element.Tag): |
remove_empty_and_low_word_count_elements(child, word_count_threshold) |
word_count = len(child.get_text(strip=True).split()) |
if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold: |
child.decompose() |
return node |
body = remove_empty_and_low_word_count_elements(body, word_count_threshold) |
def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD): |
tags_to_remove = [] |
for tag in body.find_all(True): |
if tag.string and tag.string.strip(): |
word_count = len(tag.string.strip().split()) |
if word_count < word_count_threshold: |
tags_to_remove.append(tag) |
for tag in tags_to_remove: |
tag.decompose() |
return body |
body = remove_small_text_tags(body, word_count_threshold) |
def is_empty_or_whitespace(tag: Tag): |
if isinstance(tag, NavigableString): |
return not tag.strip() |
if not tag.contents: |
return True |
return all(is_empty_or_whitespace(child) for child in tag.contents) |
def remove_empty_tags(body: Tag): |
changes = True |
while changes: |
changes = False |
empty_tags = [tag for tag in body.find_all(True) if is_empty_or_whitespace(tag)] |
for tag in empty_tags: |
tag.decompose() |
changes = True |
return body |
body = remove_empty_tags(body) |
def flatten_nested_elements(node): |
for child in node.contents: |
if isinstance(child, element.Tag): |
flatten_nested_elements(child) |
if len(child.contents) == 1 and child.contents[0].name == child.name: |
child_content = child.contents[0] |
child.replace_with(child_content) |
return node |
body = flatten_nested_elements(body) |
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): |
comment.extract() |
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') |
cleaned_html = sanitize_html(cleaned_html) |
h = html2text.HTML2Text() |
h = CustomHTML2Text() |
h.ignore_links = True |
markdown = h.handle(cleaned_html) |
markdown = markdown.replace(' ```', '```') |
try: |
meta = extract_metadata(html, soup) |
except Exception as e: |
print('Error extracting metadata:', str(e)) |
meta = {} |
return{ |
'markdown': markdown, |
'cleaned_html': cleaned_html, |
'success': True, |
'media': media, |
'links': links, |
'metadata': meta |
} |
except Exception as e: |
print('Error processing HTML content:', str(e)) |
raise InvalidCSSSelectorError(f"Invalid CSS selector: {css_selector}") from e |
def get_content_of_website_optimized(url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: |
if not html: |
return None |
soup = BeautifulSoup(html, 'html.parser') |
body = soup.body |
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) |
for tag in kwargs.get('excluded_tags', []) or []: |
for el in body.select(tag): |
el.decompose() |
if css_selector: |
selected_elements = body.select(css_selector) |
if not selected_elements: |
raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") |
body = soup.new_tag('div') |
for el in selected_elements: |
body.append(el) |
links = {'internal': [], 'external': []} |
media = {'images': [], 'videos': [], 'audios': []} |
def find_closest_parent_with_useful_text(tag): |
current_tag = tag |
while current_tag: |
current_tag = current_tag.parent |
if current_tag: |
text_content = current_tag.get_text(separator=' ',strip=True) |
if len(text_content.split()) >= image_description_min_word_threshold: |
return text_content |
return None |
def process_image(img, url, index, total_images): |
def is_valid_image(img, parent, parent_classes): |
style = img.get('style', '') |
src = img.get('src', '') |
classes_to_check = ['button', 'icon', 'logo'] |
tags_to_check = ['button', 'input'] |
return all([ |
'display:none' not in style, |
src, |
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check), |
parent.name not in tags_to_check |
]) |
def score_image_for_usefulness(img, base_url, index, images_count): |
def parse_dimension(dimension): |
if dimension: |
match = re.match(r"(\d+)(\D*)", dimension) |
if match: |
number = int(match.group(1)) |
unit = match.group(2) or 'px' |
return number, unit |
return None, None |
def fetch_image_file_size(img, base_url): |
img_url = urljoin(base_url,img.get('src')) |
try: |
response = requests.head(img_url) |
if response.status_code == 200: |
return response.headers.get('Content-Length',None) |
else: |
print(f"Failed to retrieve file size for {img_url}") |
return None |
except InvalidSchema as e: |
return None |
finally: |
return |
image_height = img.get('height') |
height_value, height_unit = parse_dimension(image_height) |
image_width = img.get('width') |
width_value, width_unit = parse_dimension(image_width) |
image_size = 0 |
image_format = os.path.splitext(img.get('src',''))[1].lower() |
image_format = image_format.strip('.') |
score = 0 |
if height_value: |
if height_unit == 'px' and height_value > 150: |
score += 1 |
if height_unit in ['%','vh','vmin','vmax'] and height_value >30: |
score += 1 |
if width_value: |
if width_unit == 'px' and width_value > 150: |
score += 1 |
if width_unit in ['%','vh','vmin','vmax'] and width_value >30: |
score += 1 |
if image_size > 10000: |
score += 1 |
if img.get('alt') != '': |
score+=1 |
if any(image_format==format for format in ['jpg','png','webp']): |
score+=1 |
if index/images_count<0.5: |
score+=1 |
return score |
if not is_valid_image(img, img.parent, img.parent.get('class', [])): |
return None |
score = score_image_for_usefulness(img, url, index, total_images) |
return None |
return { |
'src': img.get('src', '').replace('\\"', '"').strip(), |
'alt': img.get('alt', ''), |
'desc': find_closest_parent_with_useful_text(img), |
'score': score, |
'type': 'image' |
} |
def process_element(element: element.PageElement) -> bool: |
try: |
if isinstance(element, NavigableString): |
if isinstance(element, Comment): |
element.extract() |
return False |
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: |
element.decompose() |
return False |
keep_element = False |
if element.name == 'a' and element.get('href'): |
href = element['href'] |
url_base = url.split('/')[2] |
link_data = {'href': href, 'text': element.get_text()} |
if href.startswith('http') and url_base not in href: |
links['external'].append(link_data) |
else: |
links['internal'].append(link_data) |
keep_element = True |
elif element.name == 'img': |
return True |
elif element.name in ['video', 'audio']: |
media[f"{element.name}s"].append({ |
'src': element.get('src'), |
'alt': element.get('alt'), |
'type': element.name, |
'description': find_closest_parent_with_useful_text(element) |
}) |
source_tags = element.find_all('source') |
for source_tag in source_tags: |
media[f"{element.name}s"].append({ |
'src': source_tag.get('src'), |
'alt': element.get('alt'), |
'type': element.name, |
'description': find_closest_parent_with_useful_text(element) |
}) |
return True |
if element.name != 'pre': |
if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']: |
if kwargs.get('only_text', False): |
element.replace_with(element.get_text()) |
else: |
element.unwrap() |
elif element.name != 'img': |
element.attrs = {} |
for child in list(element.children): |
if isinstance(child, NavigableString) and not isinstance(child, Comment): |
if len(child.strip()) > 0: |
keep_element = True |
else: |
if process_element(child): |
keep_element = True |
if not keep_element: |
word_count = len(element.get_text(strip=True).split()) |
keep_element = word_count >= word_count_threshold |
if not keep_element: |
element.decompose() |
return keep_element |
except Exception as e: |
print('Error processing element:', str(e)) |
return False |
imgs = body.find_all('img') |
media['images'] = [ |
result for result in |
(process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) |
if result is not None |
] |
process_element(body) |
def flatten_nested_elements(node): |
if isinstance(node, NavigableString): |
return node |
if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name: |
return flatten_nested_elements(node.contents[0]) |
node.contents = [flatten_nested_elements(child) for child in node.contents] |
return node |
body = flatten_nested_elements(body) |
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') |
for img in imgs: |
try: |
src = img.get('src', '') |
if base64_pattern.match(src): |
img['src'] = base64_pattern.sub('', src) |
except: |
pass |
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') |
cleaned_html = sanitize_html(cleaned_html) |
h = CustomHTML2Text() |
h.ignore_links = True |
markdown = h.handle(cleaned_html) |
markdown = markdown.replace(' ```', '```') |
try: |
meta = extract_metadata(html, soup) |
except Exception as e: |
print('Error extracting metadata:', str(e)) |
meta = {} |
return { |
'markdown': markdown, |
'cleaned_html': cleaned_html, |
'success': True, |
'media': media, |
'links': links, |
'metadata': meta |
} |
def extract_metadata(html, soup=None): |
""" |
Extract optimized content, media, and links from website HTML. |
How it works: |
1. Similar to `get_content_of_website`, but optimized for performance. |
2. Filters and scores images for usefulness. |
3. Extracts contextual descriptions for media files. |
4. Handles excluded tags and CSS selectors. |
5. Cleans HTML and converts it to Markdown. |
Args: |
url (str): The website URL. |
html (str): The HTML content of the website. |
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD. |
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None. |
**kwargs: Additional options for customization. |
Returns: |
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata. |
""" |
metadata = {} |
if not html and not soup: |
return {} |
if not soup: |
soup = BeautifulSoup(html, 'lxml') |
head = soup.head |
if not head: |
return metadata |
title_tag = head.find('title') |
metadata['title'] = title_tag.string.strip() if title_tag and title_tag.string else None |
description_tag = head.find('meta', attrs={'name': 'description'}) |
metadata['description'] = description_tag.get('content', '').strip() if description_tag else None |
keywords_tag = head.find('meta', attrs={'name': 'keywords'}) |
metadata['keywords'] = keywords_tag.get('content', '').strip() if keywords_tag else None |
author_tag = head.find('meta', attrs={'name': 'author'}) |
metadata['author'] = author_tag.get('content', '').strip() if author_tag else None |
og_tags = head.find_all('meta', attrs={'property': re.compile(r'^og:')}) |
for tag in og_tags: |
property_name = tag.get('property', '').strip() |
content = tag.get('content', '').strip() |
if property_name and content: |
metadata[property_name] = content |
twitter_tags = head.find_all('meta', attrs={'name': re.compile(r'^twitter:')}) |
for tag in twitter_tags: |
property_name = tag.get('name', '').strip() |
content = tag.get('content', '').strip() |
if property_name and content: |
metadata[property_name] = content |
return metadata |
def extract_xml_tags(string): |
""" |
Extracts XML tags from a string. |
Args: |
string (str): The input string containing XML tags. |
Returns: |
List[str]: A list of XML tags extracted from the input string. |
""" |
tags = re.findall(r'<(\w+)>', string) |
return list(set(tags)) |
def extract_xml_data(tags, string): |
""" |
Extract data for specified XML tags from a string. |
How it works: |
1. Searches the string for each tag using regex. |
2. Extracts the content within the tags. |
3. Returns a dictionary of tag-content pairs. |
Args: |
tags (List[str]): The list of XML tags to extract. |
string (str): The input string containing XML data. |
Returns: |
Dict[str, str]: A dictionary with tag names as keys and extracted content as values. |
""" |
data = {} |
for tag in tags: |
pattern = f"<{tag}>(.*?)</{tag}>" |
match = re.search(pattern, string, re.DOTALL) |
if match: |
data[tag] = match.group(1).strip() |
else: |
data[tag] = "" |
return data |
def perform_completion_with_backoff( |
provider, |
prompt_with_variables, |
api_token, |
json_response = False, |
base_url=None, |
**kwargs |
): |
""" |
Perform an API completion request with exponential backoff. |
How it works: |
1. Sends a completion request to the API. |
2. Retries on rate-limit errors with exponential delays. |
3. Returns the API response or an error after all retries. |
Args: |
provider (str): The name of the API provider. |
prompt_with_variables (str): The input prompt for the completion request. |
api_token (str): The API token for authentication. |
json_response (bool): Whether to request a JSON response. Defaults to False. |
base_url (Optional[str]): The base URL for the API. Defaults to None. |
**kwargs: Additional arguments for the API request. |
Returns: |
dict: The API response or an error message after all retries. |
""" |
from litellm import completion |
from litellm.exceptions import RateLimitError |
max_attempts = 3 |
base_delay = 2 |
extra_args = { |
"temperature": 0.01, |
'api_key': api_token, |
'base_url': base_url |
} |
if json_response: |
extra_args["response_format"] = { "type": "json_object" } |
if kwargs.get("extra_args"): |
extra_args.update(kwargs["extra_args"]) |
for attempt in range(max_attempts): |
try: |
response =completion( |
model=provider, |
messages=[ |
{"role": "user", "content": prompt_with_variables} |
], |
**extra_args |
) |
return response |
except RateLimitError as e: |
print("Rate limit error:", str(e)) |
if attempt < max_attempts - 1: |
delay = base_delay * (2 ** attempt) |
print(f"Waiting for {delay} seconds before retrying...") |
time.sleep(delay) |
else: |
return [{ |
"index": 0, |
"tags": ["error"], |
"content": ["Rate limit error. Please try again later."] |
}] |
def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None, base_url = None): |
""" |
Extract content blocks from website HTML using an AI provider. |
How it works: |
1. Prepares a prompt by sanitizing and escaping HTML. |
2. Sends the prompt to an AI provider with optional retries. |
3. Parses the response to extract structured blocks or errors. |
Args: |
url (str): The website URL. |
html (str): The HTML content of the website. |
provider (str): The AI provider for content extraction. Defaults to DEFAULT_PROVIDER. |
api_token (Optional[str]): The API token for authentication. Defaults to None. |
base_url (Optional[str]): The base URL for the API. Defaults to None. |
Returns: |
List[dict]: A list of extracted content blocks. |
""" |
api_token = PROVIDER_MODELS.get(provider, None) if not api_token else api_token |
variable_values = { |
"URL": url, |
"HTML": escape_json_string(sanitize_html(html)), |
} |
prompt_with_variables = PROMPT_EXTRACT_BLOCKS |
for variable in variable_values: |
prompt_with_variables = prompt_with_variables.replace( |
"{" + variable + "}", variable_values[variable] |
) |
response = perform_completion_with_backoff(provider, prompt_with_variables, api_token, base_url=base_url) |
try: |
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] |
blocks = json.loads(blocks) |
for block in blocks: |
block['error'] = False |
except Exception as e: |
parsed, unparsed = split_and_parse_json_objects(response.choices[0].message.content) |
blocks = parsed |
if unparsed: |
blocks.append({ |
"index": 0, |
"error": True, |
"tags": ["error"], |
"content": unparsed |
}) |
return blocks |
def extract_blocks_batch(batch_data, provider = "groq/llama3-70b-8192", api_token = None): |
""" |
Extract content blocks from a batch of website HTMLs. |
How it works: |
1. Prepares prompts for each URL and HTML pair. |
2. Sends the prompts to the AI provider in a batch request. |
3. Parses the responses to extract structured blocks or errors. |
Args: |
batch_data (List[Tuple[str, str]]): A list of (URL, HTML) pairs. |
provider (str): The AI provider for content extraction. Defaults to "groq/llama3-70b-8192". |
api_token (Optional[str]): The API token for authentication. Defaults to None. |
Returns: |
List[dict]: A list of extracted content blocks from all batch items. |
""" |
api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token |
from litellm import batch_completion |
messages = [] |
for url, html in batch_data: |
variable_values = { |
"URL": url, |
"HTML": html, |
} |
prompt_with_variables = PROMPT_EXTRACT_BLOCKS |
for variable in variable_values: |
prompt_with_variables = prompt_with_variables.replace( |
"{" + variable + "}", variable_values[variable] |
) |
messages.append([{"role": "user", "content": prompt_with_variables}]) |
responses = batch_completion( |
model = provider, |
messages = messages, |
temperature = 0.01 |
) |
all_blocks = [] |
for response in responses: |
try: |
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] |
blocks = json.loads(blocks) |
except Exception as e: |
blocks = [{ |
"index": 0, |
"tags": ["error"], |
"content": ["Error extracting blocks from the HTML content. Choose another provider/model or try again."], |
"questions": ["What went wrong during the block extraction process?"] |
}] |
all_blocks.append(blocks) |
return sum(all_blocks, []) |
def merge_chunks_based_on_token_threshold(chunks, token_threshold): |
""" |
Merges small chunks into larger ones based on the total token threshold. |
:param chunks: List of text chunks to be merged based on token count. |
:param token_threshold: Max number of tokens for each merged chunk. |
:return: List of merged text chunks. |
""" |
merged_sections = [] |
current_chunk = [] |
total_token_so_far = 0 |
for chunk in chunks: |
chunk_token_count = len(chunk.split()) * 1.3 |
if total_token_so_far + chunk_token_count < token_threshold: |
current_chunk.append(chunk) |
total_token_so_far += chunk_token_count |
else: |
if current_chunk: |
merged_sections.append('\n\n'.join(current_chunk)) |
current_chunk = [chunk] |
total_token_so_far = chunk_token_count |
if current_chunk: |
merged_sections.append('\n\n'.join(current_chunk)) |
return merged_sections |
def process_sections(url: str, sections: list, provider: str, api_token: str, base_url=None) -> list: |
""" |
Process sections of HTML content sequentially or in parallel. |
How it works: |
1. Sequentially processes sections with delays for "groq/" providers. |
2. Uses ThreadPoolExecutor for parallel processing with other providers. |
3. Extracts content blocks for each section. |
Args: |
url (str): The website URL. |
sections (List[str]): The list of HTML sections to process. |
provider (str): The AI provider for content extraction. |
api_token (str): The API token for authentication. |
base_url (Optional[str]): The base URL for the API. Defaults to None. |
Returns: |
List[dict]: The list of extracted content blocks from all sections. |
""" |
extracted_content = [] |
if provider.startswith("groq/"): |
for section in sections: |
extracted_content.extend(extract_blocks(url, section, provider, api_token, base_url=base_url)) |
time.sleep(0.5) |
else: |
with ThreadPoolExecutor() as executor: |
futures = [executor.submit(extract_blocks, url, section, provider, api_token, base_url=base_url) for section in sections] |
for future in as_completed(futures): |
extracted_content.extend(future.result()) |
return extracted_content |
def wrap_text(draw, text, font, max_width): |
""" |
Wrap text to fit within a specified width for rendering. |
How it works: |
1. Splits the text into words. |
2. Constructs lines that fit within the maximum width using the provided font. |
3. Returns the wrapped text as a single string. |
Args: |
draw (ImageDraw.Draw): The drawing context for measuring text size. |
text (str): The text to wrap. |
font (ImageFont.FreeTypeFont): The font to use for measuring text size. |
max_width (int): The maximum width for each line. |
Returns: |
str: The wrapped text. |
""" |
lines = [] |
words = text.split() |
while words: |
line = '' |
while words and draw.textbbox((0, 0), line + words[0], font=font)[2] <= max_width: |
line += (words.pop(0) + ' ') |
lines.append(line) |
return '\n'.join(lines) |
def format_html(html_string): |
""" |
Prettify an HTML string using BeautifulSoup. |
How it works: |
1. Parses the HTML string with BeautifulSoup. |
2. Formats the HTML with proper indentation. |
3. Returns the prettified HTML string. |
Args: |
html_string (str): The HTML string to format. |
Returns: |
str: The prettified HTML string. |
""" |
soup = BeautifulSoup(html_string, 'lxml.parser') |
return soup.prettify() |
def fast_format_html(html_string): |
""" |
A fast HTML formatter that uses string operations instead of parsing. |
Args: |
html_string (str): The HTML string to format |
Returns: |
str: The formatted HTML string |
""" |
indent = 0 |
indent_str = " " |
formatted = [] |
in_content = False |
parts = html_string.replace('>', '>\n').replace('<', '\n<').split('\n') |
for part in parts: |
if not part.strip(): |
continue |
if part.startswith('</'): |
indent -= 1 |
formatted.append(indent_str * indent + part) |
elif part.startswith('<') and part.endswith('/>'): |
formatted.append(indent_str * indent + part) |
elif part.startswith('<'): |
formatted.append(indent_str * indent + part) |
indent += 1 |
else: |
content = part.strip() |
if content: |
formatted.append(indent_str * indent + content) |
return '\n'.join(formatted) |
def normalize_url(href, base_url): |
"""Normalize URLs to ensure consistent format""" |
from urllib.parse import urljoin, urlparse |
parsed_base = urlparse(base_url) |
if not parsed_base.scheme or not parsed_base.netloc: |
raise ValueError(f"Invalid base URL format: {base_url}") |
normalized = urljoin(base_url, href.strip()) |
return normalized |
def normalize_url_tmp(href, base_url): |
"""Normalize URLs to ensure consistent format""" |
try: |
base_parts = base_url.split('/') |
protocol = base_parts[0] |
domain = base_parts[2] |
except IndexError: |
raise ValueError(f"Invalid base URL format: {base_url}") |
special_protocols = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} |
if any(href.lower().startswith(proto) for proto in special_protocols): |
return href.strip() |
if href.startswith('#'): |
return f"{base_url}{href}" |
if href.startswith('//'): |
return f"{protocol}{href}" |
if href.startswith('/'): |
return f"{protocol}//{domain}{href}" |
if not href.startswith(('http://', 'https://')): |
href = href.lstrip('./') |
return f"{protocol}//{domain}/{href}" |
return href.strip() |
def get_base_domain(url: str) -> str: |
""" |
Extract the base domain from a given URL, handling common edge cases. |
How it works: |
1. Parses the URL to extract the domain. |
2. Removes the port number and 'www' prefix. |
3. Handles special domains (e.g., 'co.uk') to extract the correct base. |
Args: |
url (str): The URL to extract the base domain from. |
Returns: |
str: The extracted base domain or an empty string if parsing fails. |
""" |
try: |
domain = urlparse(url).netloc.lower() |
if not domain: |
return "" |
domain = domain.split(':')[0] |
domain = re.sub(r'^www\.', '', domain) |
parts = domain.split('.') |
if len(parts) > 2 and parts[-2] in { |
'co', 'com', 'org', 'gov', 'edu', 'net', |
'mil', 'int', 'ac', 'ad', 'ae', 'af', 'ag' |
}: |
return '.'.join(parts[-3:]) |
return '.'.join(parts[-2:]) |
except Exception: |
return "" |
def is_external_url(url: str, base_domain: str) -> bool: |
""" |
Extract the base domain from a given URL, handling common edge cases. |
How it works: |
1. Parses the URL to extract the domain. |
2. Removes the port number and 'www' prefix. |
3. Handles special domains (e.g., 'co.uk') to extract the correct base. |
Args: |
url (str): The URL to extract the base domain from. |
Returns: |
str: The extracted base domain or an empty string if parsing fails. |
""" |
special = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'} |
if any(url.lower().startswith(p) for p in special): |
return True |
try: |
parsed = urlparse(url) |
if not parsed.netloc: |
return False |
url_domain = parsed.netloc.lower().replace('www.', '') |
base = base_domain.lower().replace('www.', '') |
return not url_domain.endswith(base) |
except Exception: |
return False |
def clean_tokens(tokens: list[str]) -> list[str]: |
""" |
Clean a list of tokens by removing noise, stop words, and short tokens. |
How it works: |
1. Defines a set of noise words and stop words. |
2. Filters tokens based on length and exclusion criteria. |
3. Excludes tokens starting with certain symbols (e.g., "↑", "▲"). |
Args: |
tokens (list[str]): The list of tokens to clean. |
Returns: |
list[str]: The cleaned list of tokens. |
""" |
noise = {'ccp', 'up', '↑', '▲', '⬆️', 'a', 'an', 'at', 'by', 'in', 'of', 'on', 'to', 'the'} |
'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by', 'for', 'from', |
'has', 'he', 'in', 'is', 'it', 'its', 'of', 'on', 'that', 'the', |
'to', 'was', 'were', 'will', 'with', |
'i', 'you', 'he', 'she', 'it', 'we', 'they', |
'me', 'him', 'her', 'us', 'them', |
'my', 'your', 'his', 'her', 'its', 'our', 'their', |
'mine', 'yours', 'hers', 'ours', 'theirs', |
'myself', 'yourself', 'himself', 'herself', 'itself', 'ourselves', 'themselves', |
'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', |
'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', |
'about', 'above', 'across', 'after', 'against', 'along', 'among', 'around', |
'at', 'before', 'behind', 'below', 'beneath', 'beside', 'between', 'beyond', |
'by', 'down', 'during', 'except', 'for', 'from', 'in', 'inside', 'into', |
'near', 'of', 'off', 'on', 'out', 'outside', 'over', 'past', 'through', |
'to', 'toward', 'under', 'underneath', 'until', 'up', 'upon', 'with', 'within', |
'and', 'but', 'or', 'nor', 'for', 'yet', 'so', |
'although', 'because', 'since', 'unless', |
'a', 'an', 'the', |
'this', 'that', 'these', 'those', |
'what', 'which', 'who', 'whom', 'whose', |
'when', 'where', 'why', 'how', |
'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', |
'can', 'cannot', "can't", 'could', "couldn't", |
'may', 'might', 'must', "mustn't", |
'shall', 'should', "shouldn't", |
'will', "won't", 'would', "wouldn't", |
'not', "n't", 'no', 'nor', 'none' |
} |
return [token for token in tokens |
if len(token) > 2 |
and token not in noise |
and token not in STOP_WORDS |
and not token.startswith('↑') |
and not token.startswith('▲') |
and not token.startswith('⬆')] |
def profile_and_time(func): |
""" |
Decorator to profile a function's execution time and performance. |
How it works: |
1. Records the start time before executing the function. |
2. Profiles the function's execution using `cProfile`. |
3. Prints the elapsed time and profiling statistics. |
Args: |
func (Callable): The function to decorate. |
Returns: |
Callable: The decorated function with profiling and timing enabled. |
""" |
@wraps(func) |
def wrapper(self, *args, **kwargs): |
start_time = time.perf_counter() |
profiler = cProfile.Profile() |
profiler.enable() |
result = func(self, *args, **kwargs) |
profiler.disable() |
elapsed_time = time.perf_counter() - start_time |
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds") |
stats = pstats.Stats(profiler) |
stats.sort_stats('cumulative') |
stats.print_stats(20) |
return result |
return wrapper |
def generate_content_hash(content: str) -> str: |
"""Generate a unique hash for content""" |
return xxhash.xxh64(content.encode()).hexdigest() |
def ensure_content_dirs(base_path: str) -> Dict[str, str]: |
"""Create content directories if they don't exist""" |
dirs = { |
'html': 'html_content', |
'cleaned': 'cleaned_html', |
'markdown': 'markdown_content', |
'extracted': 'extracted_content', |
'screenshots': 'screenshots', |
'screenshot': 'screenshots' |
} |
content_paths = {} |
for key, dirname in dirs.items(): |
path = os.path.join(base_path, dirname) |
os.makedirs(path, exist_ok=True) |
content_paths[key] = path |
return content_paths |
def configure_windows_event_loop(): |
""" |
Configure the Windows event loop to use ProactorEventLoop. |
This resolves the NotImplementedError that occurs on Windows when using asyncio subprocesses. |
This function should only be called on Windows systems and before any async operations. |
On non-Windows systems, this function does nothing. |
Example: |
```python |
from crawl4ai.async_configs import configure_windows_event_loop |
# Call this before any async operations if you're on Windows |
configure_windows_event_loop() |
``` |
""" |
if platform.system() == 'Windows': |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) |
def get_error_context(exc_info, context_lines: int = 5): |
""" |
Extract error context with more reliable line number tracking. |
Args: |
exc_info: The exception info from sys.exc_info() |
context_lines: Number of lines to show before and after the error |
Returns: |
dict: Error context information |
""" |
import traceback |
import linecache |
import os |
tb = traceback.extract_tb(exc_info[2]) |
last_frame = tb[-1] |
filename = last_frame.filename |
line_no = last_frame.lineno |
func_name = last_frame.name |
context_start = max(1, line_no - context_lines) |
context_end = line_no + context_lines + 1 |
context_lines = [] |
for i in range(context_start, context_end): |
line = linecache.getline(filename, i) |
if line: |
line = line.rstrip() |
pointer = '→' if i == line_no else ' ' |
context_lines.append(f"{i:4d} {pointer} {line}") |
code_context = '\n'.join(context_lines) |
try: |
rel_path = os.path.relpath(filename) |
except ValueError: |
rel_path = filename |
return { |
"filename": rel_path, |
"line_no": line_no, |
"function": func_name, |
"code_context": code_context |
} |