|
import pandas as pd |
|
import numpy as np |
|
from transformers import AutoTokenizer, AutoModel |
|
import torch |
|
from sklearn.ensemble import IsolationForest |
|
from sklearn.preprocessing import StandardScaler |
|
from textblob import TextBlob |
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
from sklearn.decomposition import PCA |
|
import warnings |
|
from typing import Dict, List, Tuple |
|
import logging |
|
from collections import Counter |
|
from detoxify import Detoxify |
|
import re |
|
from datetime import datetime |
|
import seaborn as sns |
|
import matplotlib.pyplot as plt |
|
from pathlib import Path |
|
import json |
|
|
|
class AdvancedYelpAnalyzer: |
|
def __init__(self, df: pd.DataFrame): |
|
"""Initialize the analyzer with necessary models and configurations""" |
|
self.df = df.copy() |
|
self.bert_tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') |
|
self.bert_model = AutoModel.from_pretrained('bert-base-uncased') |
|
self.vader = SentimentIntensityAnalyzer() |
|
self.toxic_model = Detoxify('original') |
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
self.bert_model.to(self.device) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def get_bert_embeddings(self, texts: List[str], batch_size: int = 32) -> np.ndarray: |
|
"""Generate BERT embeddings for text""" |
|
embeddings = [] |
|
|
|
for i in range(0, len(texts), batch_size): |
|
batch_texts = texts[i:i + batch_size] |
|
encoded = self.bert_tokenizer(batch_texts, |
|
padding=True, |
|
truncation=True, |
|
max_length=512, |
|
return_tensors='pt') |
|
|
|
with torch.no_grad(): |
|
encoded = {k: v.to(self.device) for k, v in encoded.items()} |
|
outputs = self.bert_model(**encoded) |
|
batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() |
|
embeddings.append(batch_embeddings) |
|
|
|
return np.vstack(embeddings) |
|
|
|
def analyze_sentiment(self) -> pd.DataFrame: |
|
"""Perform comprehensive sentiment analysis using multiple tools""" |
|
self.logger.info("Starting sentiment analysis...") |
|
|
|
|
|
self.logger.info("Calculating BERT embeddings...") |
|
review_texts = self.df['review_text'].fillna('').tolist() |
|
bert_embeddings = self.get_bert_embeddings(review_texts) |
|
|
|
|
|
self.logger.info("Calculating tokenized lengths...") |
|
self.df['review_length'] = self.df['review_text'].apply( |
|
lambda x: len(self.bert_tokenizer.encode(str(x))) |
|
) |
|
|
|
|
|
self.df['bert_embedding_mean'] = np.mean(bert_embeddings, axis=1) |
|
self.df['bert_embedding_std'] = np.std(bert_embeddings, axis=1) |
|
|
|
|
|
self.df['textblob_polarity'] = self.df['review_text'].apply( |
|
lambda x: TextBlob(str(x)).sentiment.polarity |
|
) |
|
self.df['textblob_subjectivity'] = self.df['review_text'].apply( |
|
lambda x: TextBlob(str(x)).sentiment.subjectivity |
|
) |
|
|
|
|
|
def get_enhanced_vader_scores(text): |
|
|
|
negative_phrases = [ |
|
'too long', 'way too long', 'waiting', 'changed our minds', |
|
'too many', 'took forever', 'took too long', 'waste of time', |
|
'not worth', 'disappointing', 'mediocre', 'needs improvement' |
|
] |
|
|
|
|
|
base_scores = self.vader.polarity_scores(str(text)) |
|
|
|
|
|
text_lower = str(text).lower() |
|
neg_count = sum(1 for phrase in negative_phrases if phrase in text_lower) |
|
|
|
|
|
if neg_count > 0: |
|
base_scores['neg'] = max(base_scores['neg'], min(0.7, neg_count * 0.2)) |
|
base_scores['compound'] *= (1 - (neg_count * 0.15)) |
|
|
|
base_scores['neu'] = max(0, 1 - base_scores['neg'] - base_scores['pos']) |
|
|
|
return base_scores |
|
|
|
|
|
vader_scores = self.df['review_text'].apply(get_enhanced_vader_scores) |
|
self.df['vader_compound'] = vader_scores.apply(lambda x: x['compound']) |
|
self.df['vader_negative'] = vader_scores.apply(lambda x: x['neg']) |
|
self.df['vader_positive'] = vader_scores.apply(lambda x: x['pos']) |
|
self.df['vader_neutral'] = vader_scores.apply(lambda x: x['neu']) |
|
|
|
|
|
self.df['sentiment_extremity'] = self.df['vader_compound'].abs() |
|
|
|
return self.df |
|
|
|
def detect_anomalies(self) -> pd.DataFrame: |
|
"""Detect anomalous reviews using Isolation Forest with BERT features""" |
|
self.logger.info("Detecting anomalies...") |
|
|
|
|
|
features = [ |
|
'review_stars', |
|
'textblob_polarity', |
|
'vader_compound', |
|
'sentiment_extremity', |
|
'review_length', |
|
'bert_embedding_mean', |
|
'bert_embedding_std' |
|
] |
|
|
|
|
|
missing_features = [f for f in features if f not in self.df.columns] |
|
if missing_features: |
|
self.analyze_sentiment() |
|
|
|
|
|
scaler = StandardScaler() |
|
X = scaler.fit_transform(self.df[features]) |
|
|
|
|
|
iso_forest = IsolationForest( |
|
contamination=0.1, |
|
random_state=42, |
|
n_jobs=-1 |
|
) |
|
|
|
|
|
self.df['is_anomaly'] = iso_forest.fit_predict(X) |
|
self.df['anomaly_score'] = iso_forest.score_samples(X) |
|
|
|
return self.df |
|
|
|
def detect_ai_generated_text(self) -> pd.DataFrame: |
|
"""Estimate likelihood of AI-generated content""" |
|
self.logger.info("Detecting AI-generated content...") |
|
|
|
|
|
if 'textblob_subjectivity' not in self.df.columns: |
|
self.analyze_sentiment() |
|
|
|
|
|
texts = self.df['review_text'].fillna('').tolist() |
|
toxic_scores = self.toxic_model.predict(texts) |
|
|
|
|
|
toxic_score_types = ['toxicity', 'severe_toxicity', 'obscene', 'identity_attack', |
|
'insult', 'threat', 'sexual_explicit'] |
|
for score_type in toxic_score_types: |
|
if score_type in toxic_scores: |
|
self.df[f'toxic_{score_type}'] = toxic_scores[score_type] |
|
|
|
|
|
self.df['ai_generated_likelihood'] = ( |
|
(self.df['textblob_subjectivity'] < 0.3) & |
|
(self.df['sentiment_extremity'] > 0.8) & |
|
(self.df['review_length'] > self.df['review_length'].quantile(0.95)) & |
|
(self.df['bert_embedding_std'] < self.df['bert_embedding_std'].quantile(0.25)) |
|
).astype(int) |
|
|
|
|
|
self.df['ai_detection_score'] = ( |
|
(self.df['textblob_subjectivity'] * -1) + |
|
(self.df['sentiment_extremity'] * 0.5) + |
|
(self.df['bert_embedding_std'] * -0.5) |
|
).clip(0, 1) |
|
|
|
return self.df |
|
|
|
def analyze_business_categories(self) -> Dict: |
|
"""Analyze trends and patterns specific to business categories""" |
|
self.logger.info("Analyzing business categories...") |
|
|
|
|
|
categories = self.df['categories'].fillna('').str.split(', ') |
|
all_categories = [cat for cats in categories if isinstance(cats, list) for cat in cats] |
|
category_counts = Counter(all_categories) |
|
|
|
|
|
category_analysis = {} |
|
for category in set(all_categories): |
|
category_reviews = self.df[self.df['categories'].str.contains(category, na=False)] |
|
|
|
category_analysis[category] = { |
|
'review_count': len(category_reviews), |
|
'avg_rating': category_reviews['review_stars'].mean() if not category_reviews.empty else None, |
|
'avg_sentiment': category_reviews['vader_compound'].mean() if 'vader_compound' in self.df.columns and not category_reviews.empty else None, |
|
'avg_subjectivity': category_reviews['textblob_subjectivity'].mean() if 'textblob_subjectivity' in self.df.columns and not category_reviews.empty else None |
|
} |
|
|
|
return category_analysis |
|
|
|
def visualize_results(self, output_dir: str): |
|
"""Create visualizations for analysis results""" |
|
plt.figure(figsize=(15, 10)) |
|
|
|
|
|
plt.subplot(2, 2, 1) |
|
sns.histplot(data=self.df, x='vader_compound', bins=50) |
|
plt.title('Sentiment Distribution') |
|
|
|
|
|
plt.subplot(2, 2, 2) |
|
daily_reviews = self.df.groupby('review_date').size() |
|
daily_reviews.plot() |
|
plt.title('Review Volume Over Time') |
|
|
|
|
|
plt.subplot(2, 2, 3) |
|
if 'anomaly_score' not in self.df.columns: |
|
self.detect_anomalies() |
|
sns.histplot(data=self.df, x='anomaly_score', bins=50) |
|
plt.title('Anomaly Score Distribution') |
|
|
|
|
|
plt.subplot(2, 2, 4) |
|
if 'ai_generated_likelihood' not in self.df.columns: |
|
self.detect_ai_generated_text() |
|
sns.histplot(data=self.df, x='ai_generated_likelihood', bins=2) |
|
plt.title('AI Generation Likelihood') |
|
|
|
plt.tight_layout() |
|
plt.savefig(f'{output_dir}/analysis_results.png') |
|
plt.close() |
|
|
|
def run_full_analysis(self, output_dir: str) -> Tuple[pd.DataFrame, Dict]: |
|
"""Run complete analysis pipeline with detailed outputs""" |
|
self.logger.info("Starting full analysis pipeline...") |
|
|
|
|
|
output_dir = Path(output_dir) |
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
|
|
self.analyze_sentiment() |
|
self.detect_anomalies() |
|
self.detect_ai_generated_text() |
|
category_analysis = self.analyze_business_categories() |
|
|
|
|
|
self.visualize_results(str(output_dir)) |
|
|
|
|
|
analysis_results = { |
|
'category_analysis': category_analysis, |
|
'sentiment_summary': { |
|
'avg_sentiment': self.df['vader_compound'].mean(), |
|
'positive_reviews': len(self.df[self.df['vader_compound'] > 0.5]), |
|
'negative_reviews': len(self.df[self.df['vader_compound'] < -0.5]), |
|
'neutral_reviews': len(self.df[abs(self.df['vader_compound']) <= 0.5]) |
|
}, |
|
'ai_detection_summary': { |
|
'likely_ai_generated': len(self.df[self.df['ai_generated_likelihood'] == 1]), |
|
'avg_ai_score': self.df['ai_detection_score'].mean() |
|
}, |
|
'anomaly_summary': { |
|
'anomalous_reviews': len(self.df[self.df['is_anomaly'] == -1]), |
|
'avg_anomaly_score': self.df['anomaly_score'].mean() |
|
} |
|
} |
|
|
|
|
|
self.df.to_csv(output_dir / "analyzed_data.csv", index=False) |
|
with open(output_dir / "analysis_results.json", 'w') as f: |
|
json.dump(analysis_results, f, indent=4) |
|
|
|
return self.df, analysis_results |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error during analysis: {str(e)}") |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
try: |
|
|
|
df = pd.read_csv("test_data.csv") |
|
|
|
|
|
analyzer = AdvancedYelpAnalyzer(df) |
|
|
|
|
|
output_dir = "output" |
|
analyzed_df, results = analyzer.run_full_analysis(output_dir) |
|
|
|
logger.info("Analysis completed successfully!") |
|
|
|
except Exception as e: |
|
logger.error(f"Error during testing: {str(e)}") |
|
raise |