|
import os |
|
import time |
|
import requests |
|
import cloudscraper |
|
import streamlit as st |
|
from bs4 import BeautifulSoup |
|
from dotenv import load_dotenv |
|
from selenium import webdriver |
|
from langchain_groq import ChatGroq |
|
from urllib.parse import urljoin, urlparse |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.webdriver.chrome.service import Service |
|
from webdriver_manager.chrome import ChromeDriverManager |
|
|
|
|
|
load_dotenv() |
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
|
|
if not GROQ_API_KEY: |
|
st.error("Error: Groq API Key is missing. Please set 'GROQ_API_KEY' as an environment variable.") |
|
|
|
chat = ChatGroq(temperature=0, groq_api_key=GROQ_API_KEY, model_name="llama-3.3-70b-versatile") |
|
|
|
|
|
MODEL_TOKEN_LIMIT = 32500 |
|
CHUNK_SIZE = 15000 |
|
|
|
|
|
scraper = cloudscraper.create_scraper() |
|
|
|
|
|
HEADERS = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
|
"Connection": "keep-alive", |
|
"Upgrade-Insecure-Requests": "1", |
|
} |
|
|
|
|
|
def get_valid_links(base_url): |
|
"""Extracts all internal links, including footer and JavaScript-rendered links.""" |
|
try: |
|
response = scraper.get(base_url, headers=HEADERS, timeout=10) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
domain = urlparse(base_url).netloc |
|
|
|
links = set() |
|
for link in soup.find_all('a', href=True): |
|
full_url = urljoin(base_url, link.get('href')) |
|
if domain in urlparse(full_url).netloc: |
|
links.add(full_url) |
|
|
|
|
|
if len(links) < 5 or not check_footer_links(soup): |
|
selenium_links = get_links_with_selenium(base_url) |
|
links.update(selenium_links) |
|
|
|
return links |
|
except requests.exceptions.RequestException as e: |
|
print(f"β Error fetching links: {e}") |
|
return set() |
|
|
|
def check_footer_links(soup): |
|
"""Checks if footer links exist.""" |
|
footer = soup.find("footer") |
|
return footer and footer.find_all("a", href=True) |
|
|
|
def get_links_with_selenium(url): |
|
"""Extracts JavaScript-rendered links using Selenium.""" |
|
try: |
|
options = Options() |
|
options.add_argument("--headless") |
|
options.add_argument("--disable-gpu") |
|
options.add_argument("--no-sandbox") |
|
|
|
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) |
|
driver.get(url) |
|
time.sleep(5) |
|
|
|
soup = BeautifulSoup(driver.page_source, 'html.parser') |
|
driver.quit() |
|
|
|
links = set() |
|
domain = urlparse(url).netloc |
|
|
|
for link in soup.find_all('a', href=True): |
|
full_url = urljoin(url, link.get('href')) |
|
if domain in urlparse(full_url).netloc: |
|
links.add(full_url) |
|
|
|
return links |
|
except Exception as e: |
|
print(f"β Selenium Error: {e}") |
|
return set() |
|
|
|
|
|
def scrape_page(url): |
|
"""Scrapes a webpage, using Selenium if necessary.""" |
|
try: |
|
response = scraper.get(url, headers=HEADERS, timeout=10) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
if len(soup.get_text(strip=True)) < 500: |
|
return scrape_with_selenium(url) |
|
|
|
return extract_text(soup) |
|
except requests.exceptions.RequestException: |
|
return scrape_with_selenium(url) |
|
|
|
def scrape_with_selenium(url): |
|
"""Scrapes JavaScript-heavy pages using Selenium.""" |
|
try: |
|
options = Options() |
|
options.add_argument("--headless") |
|
options.add_argument("--disable-gpu") |
|
options.add_argument("--no-sandbox") |
|
|
|
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) |
|
driver.get(url) |
|
time.sleep(5) |
|
soup = BeautifulSoup(driver.page_source, 'html.parser') |
|
driver.quit() |
|
|
|
return extract_text(soup) |
|
except Exception as e: |
|
return f"β Selenium Scraping Error: {e}" |
|
|
|
def extract_text(soup): |
|
"""Extracts **all** meaningful text from HTML content, including dynamic elements.""" |
|
|
|
|
|
all_text = soup.get_text(separator="\n", strip=True) |
|
|
|
|
|
unique_lines = set(all_text.split("\n")) |
|
cleaned_text = "\n".join(line for line in unique_lines if len(line) > 3) |
|
|
|
return cleaned_text |
|
|
|
|
|
def split_into_chunks(text, chunk_size): |
|
"""Splits long content into manageable chunks for AI processing.""" |
|
words = text.split() |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for word in words: |
|
if current_length + len(word) + 1 > chunk_size: |
|
chunks.append(" ".join(current_chunk)) |
|
current_chunk = [] |
|
current_length = 0 |
|
current_chunk.append(word) |
|
current_length += len(word) + 1 |
|
|
|
if current_chunk: |
|
chunks.append(" ".join(current_chunk)) |
|
|
|
return chunks |
|
|
|
|
|
def generate_detailed_company_info(company_data): |
|
"""Generates an in-depth company breakdown with AI.""" |
|
|
|
system_prompt = """ |
|
You are a business research AI. Provide **detailed** insights strictly from the extracted company data. |
|
- Do **not** infer missing details. |
|
- If data is missing, label it as **"Data Not Available"**. |
|
""" |
|
|
|
user_prompt_template = f""" |
|
Based on the extracted content, **generate a structured company analysis**: |
|
|
|
## **Company Overview** |
|
- Full company name, industry, and key differentiators. |
|
- Headquarters location & founding year (if available). |
|
|
|
## **Mission & Vision** |
|
- Clearly state the company's mission and vision. |
|
- If missing, state **"Data Not Available"**. |
|
|
|
## **Products & Services** |
|
- List major products/services and their benefits. |
|
|
|
## **Target Audience** |
|
- Define customer demographics or industries served. |
|
|
|
## **Business Model & Revenue Streams** |
|
- Describe revenue model (e.g., SaaS, B2B, freemium). |
|
|
|
## **Competitive Edge & Market Position** |
|
- Highlight unique features, patents, and innovations. |
|
|
|
## **Clients & Industry Impact** |
|
- Notable clients, case studies, or market influence. |
|
|
|
**Extracted Data:** |
|
{company_data} |
|
""" |
|
|
|
responses = [] |
|
if len(company_data) > CHUNK_SIZE: |
|
st.warning("π Large content detected! Splitting into multiple AI requests.") |
|
chunks = split_into_chunks(company_data, CHUNK_SIZE) |
|
|
|
for i, chunk in enumerate(chunks): |
|
st.write(f"Processing AI Response {i+1}/{len(chunks)}...") |
|
prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)]) |
|
chain = prompt | chat |
|
response = chain.invoke({"text": user_prompt_template.format(company_data=chunk)}) |
|
responses.append(response.content) |
|
|
|
return "\n\n".join(responses) |
|
|
|
else: |
|
prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)]) |
|
chain = prompt | chat |
|
response = chain.invoke({"text": user_prompt_template}) |
|
return response.content |
|
|
|
|
|
def main(): |
|
st.title("π AI-Powered Company Website Scraper") |
|
base_url = st.text_input("π Enter Website URL", "") |
|
|
|
if st.button("Scrape"): |
|
if base_url: |
|
st.write(f"π Scraping: {base_url}... Please wait.") |
|
valid_links = get_valid_links(base_url) |
|
|
|
if valid_links: |
|
scraped_content = {link: scrape_page(link) for link in valid_links} |
|
full_content = "\n".join(scraped_content.values()) |
|
detailed_info = generate_detailed_company_info(full_content) |
|
st.write(detailed_info) |
|
|
|
|
|
with st.expander("π **View Scraped Content** (Click to Expand)", expanded=False): |
|
for url, content in scraped_content.items(): |
|
st.write(f"### {url}") |
|
st.write(content) |
|
|
|
if __name__ == "__main__": |
|
main() |