import os import time import requests import cloudscraper import streamlit as st from bs4 import BeautifulSoup from dotenv import load_dotenv from selenium import webdriver from langchain_groq import ChatGroq from urllib.parse import urljoin, urlparse from langchain_core.prompts import ChatPromptTemplate from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager # Load API Key load_dotenv() GROQ_API_KEY = os.getenv("GROQ_API_KEY") if not GROQ_API_KEY: st.error("Error: Groq API Key is missing. Please set 'GROQ_API_KEY' as an environment variable.") chat = ChatGroq(temperature=0, groq_api_key=GROQ_API_KEY, model_name="llama-3.3-70b-versatile") # Model Token Limits MODEL_TOKEN_LIMIT = 32500 CHUNK_SIZE = 15000 # Initialize Cloudscraper scraper = cloudscraper.create_scraper() # Headers to mimic real browser requests HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } # ✅ **Extract Links** def get_valid_links(base_url): """Extracts all internal links, including footer and JavaScript-rendered links.""" try: response = scraper.get(base_url, headers=HEADERS, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') domain = urlparse(base_url).netloc links = set() for link in soup.find_all('a', href=True): full_url = urljoin(base_url, link.get('href')) if domain in urlparse(full_url).netloc: links.add(full_url) # If few links are found, fallback to Selenium if len(links) < 5 or not check_footer_links(soup): selenium_links = get_links_with_selenium(base_url) links.update(selenium_links) return links except requests.exceptions.RequestException as e: print(f"❌ Error fetching links: {e}") return set() def check_footer_links(soup): """Checks if footer links exist.""" footer = soup.find("footer") return footer and footer.find_all("a", href=True) def get_links_with_selenium(url): """Extracts JavaScript-rendered links using Selenium.""" try: options = Options() options.add_argument("--headless") options.add_argument("--disable-gpu") options.add_argument("--no-sandbox") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) time.sleep(5) # Allow JavaScript to load soup = BeautifulSoup(driver.page_source, 'html.parser') driver.quit() links = set() domain = urlparse(url).netloc for link in soup.find_all('a', href=True): full_url = urljoin(url, link.get('href')) if domain in urlparse(full_url).netloc: links.add(full_url) return links except Exception as e: print(f"❌ Selenium Error: {e}") return set() # ✅ **Scrape Pages** def scrape_page(url): """Scrapes a webpage, using Selenium if necessary.""" try: response = scraper.get(url, headers=HEADERS, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') if len(soup.get_text(strip=True)) < 500: return scrape_with_selenium(url) return extract_text(soup) except requests.exceptions.RequestException: return scrape_with_selenium(url) def scrape_with_selenium(url): """Scrapes JavaScript-heavy pages using Selenium.""" try: options = Options() options.add_argument("--headless") options.add_argument("--disable-gpu") options.add_argument("--no-sandbox") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) time.sleep(5) # Allow JavaScript to load soup = BeautifulSoup(driver.page_source, 'html.parser') driver.quit() return extract_text(soup) except Exception as e: return f"❌ Selenium Scraping Error: {e}" def extract_text(soup): """Extracts **all** meaningful text from HTML content, including dynamic elements.""" # ✅ Extracts all text from the HTML, not just specific tags all_text = soup.get_text(separator="\n", strip=True) # ✅ Removes duplicate lines & unwanted spaces unique_lines = set(all_text.split("\n")) cleaned_text = "\n".join(line for line in unique_lines if len(line) > 3) # Exclude tiny fragments return cleaned_text # ✅ **Chunking for Large AI Requests** def split_into_chunks(text, chunk_size): """Splits long content into manageable chunks for AI processing.""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: if current_length + len(word) + 1 > chunk_size: chunks.append(" ".join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(word) current_length += len(word) + 1 if current_chunk: chunks.append(" ".join(current_chunk)) return chunks # ✅ **AI-Powered Company Breakdown** def generate_detailed_company_info(company_data): """Generates an in-depth company breakdown with AI.""" system_prompt = """ You are a business research AI. Provide **detailed** insights strictly from the extracted company data. - Do **not** infer missing details. - If data is missing, label it as **"Data Not Available"**. """ user_prompt_template = f""" Based on the extracted content, **generate a structured company analysis**: ## **Company Overview** - Full company name, industry, and key differentiators. - Headquarters location & founding year (if available). ## **Mission & Vision** - Clearly state the company's mission and vision. - If missing, state **"Data Not Available"**. ## **Products & Services** - List major products/services and their benefits. ## **Target Audience** - Define customer demographics or industries served. ## **Business Model & Revenue Streams** - Describe revenue model (e.g., SaaS, B2B, freemium). ## **Competitive Edge & Market Position** - Highlight unique features, patents, and innovations. ## **Clients & Industry Impact** - Notable clients, case studies, or market influence. **Extracted Data:** {company_data} """ responses = [] if len(company_data) > CHUNK_SIZE: st.warning("🔄 Large content detected! Splitting into multiple AI requests.") chunks = split_into_chunks(company_data, CHUNK_SIZE) for i, chunk in enumerate(chunks): st.write(f"Processing AI Response {i+1}/{len(chunks)}...") prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)]) chain = prompt | chat response = chain.invoke({"text": user_prompt_template.format(company_data=chunk)}) responses.append(response.content) return "\n\n".join(responses) else: prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)]) chain = prompt | chat response = chain.invoke({"text": user_prompt_template}) return response.content # ✅ **Streamlit UI** def main(): st.title("🚀 AI-Powered Company Website Scraper") base_url = st.text_input("🔗 Enter Website URL", "") if st.button("Scrape"): if base_url: st.write(f"🔍 Scraping: {base_url}... Please wait.") valid_links = get_valid_links(base_url) if valid_links: scraped_content = {link: scrape_page(link) for link in valid_links} full_content = "\n".join(scraped_content.values()) detailed_info = generate_detailed_company_info(full_content) st.write(detailed_info) # Make the Scraped Content collapsible with st.expander("📜 **View Scraped Content** (Click to Expand)", expanded=False): for url, content in scraped_content.items(): st.write(f"### {url}") st.write(content) if __name__ == "__main__": main()