Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import pytesseract | |
from PIL import Image | |
import docx | |
from transformers import pipeline | |
from keybert import KeyBERT | |
from io import BytesIO | |
from langdetect import detect | |
import re | |
import asyncio | |
from twscrape import API, gather | |
from selenium.webdriver.chrome.options import Options | |
from selenium import webdriver | |
from webdriver_manager.chrome import ChromeDriverManager | |
from bs4 import BeautifulSoup | |
import time | |
# Set up Tesseract | |
# pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Uncomment for Windows | |
# Initialize AI models | |
emotion_classifier = pipeline("text-classification", model="joeddav/distilbert-base-uncased-go-emotions-student") | |
keyword_extractor = KeyBERT() | |
zero_shot_classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") | |
class RealTimeSocialScraper: | |
def __init__(self): | |
self.api = API() # Configure proxies if needed | |
self.driver = self._init_browser() | |
def _init_browser(self): | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--disable-gpu") | |
return webdriver.Chrome(ChromeDriverManager().install(), options=chrome_options) | |
async def scrape(self, platform, query, limit=10): | |
if platform == "twitter": | |
return await self._scrape_twitter(query, limit) | |
elif platform == "instagram": | |
return self._scrape_instagram(query) | |
elif platform == "tiktok": | |
return self._scrape_tiktok(query) | |
else: | |
raise ValueError(f"Unsupported platform: {platform}") | |
async def _scrape_twitter(self, query, limit): | |
await self.api.pool.login_all() | |
return await gather(self.api.search(query, limit=limit)) | |
def _scrape_instagram(self, query): | |
self.driver.get(f"https://www.instagram.com/explore/tags/{query}/") | |
WebDriverWait(self.driver, 30).until( | |
EC.presence_of_element_located((By.CLASS_NAME, "v1Nh3")) | |
) | |
soup = BeautifulSoup(self.driver.page_source, 'html.parser') | |
posts = [] | |
for post in soup.findAll("div", class_="v1Nh3"): | |
posts.append({ | |
'content': post.find('img')['alt'], | |
'image_url': post.find('img')['src'] | |
}) | |
return posts[:10] | |
def _scrape_tiktok(self, query): | |
# Implement TikTok scraping logic or use API | |
return [{"content": f"Demo TikTok post about {query}"}] | |
async def extract_posts(profile_url, hashtags, num_posts): | |
scraper = RealTimeSocialScraper() | |
platform = "twitter" if "twitter" in profile_url else "instagram" | |
try: | |
raw_posts = await scraper.scrape(platform, hashtags[0], num_posts) | |
return await _format_posts(raw_posts, platform) | |
except Exception as e: | |
print(f"Scraping failed: {e}") | |
return _fallback_data(num_posts) | |
def _format_posts(raw_posts, platform): | |
formatted = [] | |
for post in raw_posts: | |
base_post = { | |
"caption": getattr(post, "rawContent", post.get('content', 'No caption')), | |
"image_url": getattr(post, "image_url", ""), | |
"video_url": "", | |
"audio_url": "", | |
"tagged_audience": [], | |
"date": str(time.strftime("%Y-%m-%d")), | |
"likes": getattr(post, "likeCount", 0), | |
"comments": getattr(post, "replyCount", 0) | |
} | |
formatted.append(base_post) | |
return formatted | |
def _fallback_data(num_posts): | |
return [ | |
{ | |
"caption": "Sample post about environmental issues", | |
"image_url": "https://example.com/sample.jpg", | |
"date": "2023-10-01", | |
"likes": 100, | |
"comments": 20, | |
} for _ in range(num_posts) | |
] | |
def extract_text_from_image(image_url): | |
try: | |
response = requests.get(image_url, timeout=10) | |
image = Image.open(BytesIO(response.content)) | |
text = pytesseract.image_to_string(image) | |
return text.strip() | |
except Exception as e: | |
return f"OCR Error: {str(e)}" | |
def categorize_post(caption): | |
categories = ["activism", "politics", "social issues", "technology", "environment", "health"] | |
result = zero_shot_classifier(caption, candidate_labels=categories) | |
return result["labels"][0] | |
def analyze_sentiment(caption): | |
emotions = emotion_classifier(caption, top_k=None) | |
return sorted(emotions, key=lambda x: x["score"], reverse=True)[:3] | |
def detect_language(caption): | |
try: | |
return detect(caption) | |
except: | |
return "Unknown" | |
def extract_hashtags(caption): | |
return re.findall(r"#\w+", caption) | |
def process_posts(profile_url, hashtags, num_posts): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
posts = loop.run_until_complete(extract_posts(profile_url, [h.strip() for h in hashtags.split(",")], num_posts)) | |
doc = docx.Document() | |
doc.add_heading("Social Media Analysis Report", 0) | |
for i, post in enumerate(posts): | |
doc.add_heading(f"Post {i+1}", level=1) | |
# Metadata Section | |
meta = [ | |
f"Date: {post.get('date', 'N/A')}", | |
f"Likes: {post.get('likes', 0)}", | |
f"Comments: {post.get('comments', 0)}", | |
f"Media: Pictures={1 if post['image_url'] else 0}, Videos={1 if post['video_url'] else 0}" | |
] | |
doc.add_paragraph("\n".join(meta)) | |
# Content Analysis | |
content = doc.add_paragraph() | |
content.add_run("Caption Analysis:\n").bold = True | |
content.add_run(f"{post['caption']}\n\n") | |
# Sentiment and Language | |
content.add_run(f"Language: {detect_language(post['caption'])}\n") | |
emotions = analyze_sentiment(post['caption']) | |
content.add_run(f"Sentiment: {', '.join([f\"{e['label']} ({e['score']:.2f})\" for e in emotions])}\n") | |
# Hashtags and Category | |
hashtags = extract_hashtags(post['caption']) | |
content.add_run(f"Hashtags: {', '.join(hashtags) if hashtags else 'None'}\n") | |
content.add_run(f"Category: {categorize_post(post['caption'])}\n") | |
# Image Analysis | |
if post['image_url']: | |
img_analysis = doc.add_paragraph() | |
img_analysis.add_run("Image Analysis:\n").bold = True | |
img_analysis.add_run(f"Extracted Text: {extract_text_from_image(post['image_url'])[:500]}\n") | |
doc.add_page_break() | |
report_path = "social_media_analysis.docx" | |
doc.save(report_path) | |
return report_path | |
iface = gr.Interface( | |
fn=process_posts, | |
inputs=[ | |
gr.Textbox(label="Profile URL", placeholder="Enter social media profile URL"), | |
gr.Textbox(label="Hashtags", placeholder="Comma-separated hashtags"), | |
gr.Slider(1, 50, value=5, label="Posts to Analyze") | |
], | |
outputs=gr.File(label="Download Report"), | |
title="Social Media Intelligence Analyzer", | |
description="""Real-time social media analysis with: | |
- 🕵️♂️ Live scraping | |
- 📊 Sentiment analysis | |
- 🖼️ Image OCR | |
- 🏷️ Hashtag tracking""", | |
examples=[ | |
["https://twitter.com/eco_news", "climate, environment", 3], | |
["https://instagram.com/tech_innovators", "technology, future", 2] | |
] | |
) | |
if __name__ == "__main__": | |
iface.launch() |