kushagrasharma-13's picture
Update app.py
d3a901d verified
import os
import time
import requests
import cloudscraper
import streamlit as st
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from selenium import webdriver
from langchain_groq import ChatGroq
from urllib.parse import urljoin, urlparse
from langchain_core.prompts import ChatPromptTemplate
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Load API Key
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
st.error("Error: Groq API Key is missing. Please set 'GROQ_API_KEY' as an environment variable.")
chat = ChatGroq(temperature=0, groq_api_key=GROQ_API_KEY, model_name="llama-3.3-70b-versatile")
# Model Token Limits
MODEL_TOKEN_LIMIT = 32500
CHUNK_SIZE = 15000
# Initialize Cloudscraper
scraper = cloudscraper.create_scraper()
# Headers to mimic real browser requests
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# βœ… **Extract Links**
def get_valid_links(base_url):
"""Extracts all internal links, including footer and JavaScript-rendered links."""
try:
response = scraper.get(base_url, headers=HEADERS, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
domain = urlparse(base_url).netloc
links = set()
for link in soup.find_all('a', href=True):
full_url = urljoin(base_url, link.get('href'))
if domain in urlparse(full_url).netloc:
links.add(full_url)
# If few links are found, fallback to Selenium
if len(links) < 5 or not check_footer_links(soup):
selenium_links = get_links_with_selenium(base_url)
links.update(selenium_links)
return links
except requests.exceptions.RequestException as e:
print(f"❌ Error fetching links: {e}")
return set()
def check_footer_links(soup):
"""Checks if footer links exist."""
footer = soup.find("footer")
return footer and footer.find_all("a", href=True)
def get_links_with_selenium(url):
"""Extracts JavaScript-rendered links using Selenium."""
try:
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
time.sleep(5) # Allow JavaScript to load
soup = BeautifulSoup(driver.page_source, 'html.parser')
driver.quit()
links = set()
domain = urlparse(url).netloc
for link in soup.find_all('a', href=True):
full_url = urljoin(url, link.get('href'))
if domain in urlparse(full_url).netloc:
links.add(full_url)
return links
except Exception as e:
print(f"❌ Selenium Error: {e}")
return set()
# βœ… **Scrape Pages**
def scrape_page(url):
"""Scrapes a webpage, using Selenium if necessary."""
try:
response = scraper.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
if len(soup.get_text(strip=True)) < 500:
return scrape_with_selenium(url)
return extract_text(soup)
except requests.exceptions.RequestException:
return scrape_with_selenium(url)
def scrape_with_selenium(url):
"""Scrapes JavaScript-heavy pages using Selenium."""
try:
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
time.sleep(5) # Allow JavaScript to load
soup = BeautifulSoup(driver.page_source, 'html.parser')
driver.quit()
return extract_text(soup)
except Exception as e:
return f"❌ Selenium Scraping Error: {e}"
def extract_text(soup):
"""Extracts **all** meaningful text from HTML content, including dynamic elements."""
# βœ… Extracts all text from the HTML, not just specific tags
all_text = soup.get_text(separator="\n", strip=True)
# βœ… Removes duplicate lines & unwanted spaces
unique_lines = set(all_text.split("\n"))
cleaned_text = "\n".join(line for line in unique_lines if len(line) > 3) # Exclude tiny fragments
return cleaned_text
# βœ… **Chunking for Large AI Requests**
def split_into_chunks(text, chunk_size):
"""Splits long content into manageable chunks for AI processing."""
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
if current_length + len(word) + 1 > chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_length = 0
current_chunk.append(word)
current_length += len(word) + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
# βœ… **AI-Powered Company Breakdown**
def generate_detailed_company_info(company_data):
"""Generates an in-depth company breakdown with AI."""
system_prompt = """
You are a business research AI. Provide **detailed** insights strictly from the extracted company data.
- Do **not** infer missing details.
- If data is missing, label it as **"Data Not Available"**.
"""
user_prompt_template = f"""
Based on the extracted content, **generate a structured company analysis**:
## **Company Overview**
- Full company name, industry, and key differentiators.
- Headquarters location & founding year (if available).
## **Mission & Vision**
- Clearly state the company's mission and vision.
- If missing, state **"Data Not Available"**.
## **Products & Services**
- List major products/services and their benefits.
## **Target Audience**
- Define customer demographics or industries served.
## **Business Model & Revenue Streams**
- Describe revenue model (e.g., SaaS, B2B, freemium).
## **Competitive Edge & Market Position**
- Highlight unique features, patents, and innovations.
## **Clients & Industry Impact**
- Notable clients, case studies, or market influence.
**Extracted Data:**
{company_data}
"""
responses = []
if len(company_data) > CHUNK_SIZE:
st.warning("πŸ”„ Large content detected! Splitting into multiple AI requests.")
chunks = split_into_chunks(company_data, CHUNK_SIZE)
for i, chunk in enumerate(chunks):
st.write(f"Processing AI Response {i+1}/{len(chunks)}...")
prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)])
chain = prompt | chat
response = chain.invoke({"text": user_prompt_template.format(company_data=chunk)})
responses.append(response.content)
return "\n\n".join(responses)
else:
prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", user_prompt_template)])
chain = prompt | chat
response = chain.invoke({"text": user_prompt_template})
return response.content
# βœ… **Streamlit UI**
def main():
st.title("πŸš€ AI-Powered Company Website Scraper")
base_url = st.text_input("πŸ”— Enter Website URL", "")
if st.button("Scrape"):
if base_url:
st.write(f"πŸ” Scraping: {base_url}... Please wait.")
valid_links = get_valid_links(base_url)
if valid_links:
scraped_content = {link: scrape_page(link) for link in valid_links}
full_content = "\n".join(scraped_content.values())
detailed_info = generate_detailed_company_info(full_content)
st.write(detailed_info)
# Make the Scraped Content collapsible
with st.expander("πŸ“œ **View Scraped Content** (Click to Expand)", expanded=False):
for url, content in scraped_content.items():
st.write(f"### {url}")
st.write(content)
if __name__ == "__main__":
main()