Spaces:
Runtime error
Runtime error
from flask import Flask, request, jsonify, render_template, send_from_directory | |
from transformers import ( | |
AutoModelForSequenceClassification, | |
AutoTokenizer, | |
TFCLIPModel, | |
CLIPProcessor, | |
pipeline, | |
BertTokenizer, | |
BertForSequenceClassification | |
) | |
import cv2 | |
import os | |
import subprocess | |
import torch | |
from PIL import Image | |
import numpy as np | |
import base64 | |
import uuid | |
from ultralytics import YOLO | |
import tensorflow as tf | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
# Create directories | |
os.makedirs('save', exist_ok=True) | |
os.makedirs('temp', exist_ok=True) | |
os.makedirs('unsafe_frames', exist_ok=True) | |
os.makedirs('audio', exist_ok=True) | |
os.makedirs('logs', exist_ok=True) | |
os.makedirs('text_output', exist_ok=True) | |
print("Loading models...") | |
try: | |
# Load models | |
nudity_model = YOLO("Models/nudenet/320n.pt") | |
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
bert_model = BertForSequenceClassification.from_pretrained('bert-base-uncased') | |
profanity_model = AutoModelForSequenceClassification.from_pretrained("unitary/toxic-bert") | |
profanity_tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert") | |
hate_speech_model = AutoModelForSequenceClassification.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") | |
hate_speech_tokenizer = AutoTokenizer.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") | |
clip_model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") | |
whisper_model = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") | |
print("All models loaded successfully") | |
except Exception as e: | |
logger.error(f"Error loading models: {str(e)}") | |
raise | |
def home(): | |
return render_template('index.html') | |
def extract_text(): | |
try: | |
audio_file = request.form.get('audio_file') | |
if not audio_file: | |
return jsonify({"error": "No audio file specified"}), 400 | |
audio_path = os.path.join('audio', audio_file) | |
if not os.path.exists(audio_path): | |
return jsonify({"error": "Audio file not found"}), 404 | |
# Process audio and get text | |
audio_result = process_audio(audio_path) | |
if not audio_result['success']: | |
return jsonify({"error": audio_result['error']}), 500 | |
# Save extracted text | |
text_filename = f"text_{uuid.uuid4().hex}.txt" | |
text_path = os.path.join('text_output', text_filename) | |
with open(text_path, 'w', encoding='utf-8') as f: | |
f.write(audio_result['text']) | |
# Analyze text content | |
text_analysis = analyze_text_content(audio_result['text']) | |
return jsonify({ | |
"success": True, | |
"text": audio_result['text'], | |
"text_file": text_filename, | |
"confidence": audio_result['confidence'], | |
"analysis": text_analysis | |
}) | |
except Exception as e: | |
logger.error(f"Error extracting text: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
def serve_audio(filename): | |
return send_from_directory('audio', filename) | |
def upload_file(): | |
try: | |
if 'file' not in request.files: | |
return jsonify({"error": "No file uploaded"}), 400 | |
video = request.files['file'] | |
if video.filename == '': | |
return jsonify({"error": "No file selected"}), 400 | |
video_path = os.path.join('save', video.filename) | |
video.save(video_path) | |
try: | |
frames = extract_frames(video_path) | |
results = [] | |
audio_filename = f"audio_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join('audio', audio_filename) | |
audio_result = extract_audio(video_path, audio_path) | |
if audio_result: | |
audio_text = process_audio(audio_path) | |
text_content = audio_text.get('text', '') | |
# Save extracted text | |
if text_content: | |
text_filename = f"text_{uuid.uuid4().hex}.txt" | |
text_path = os.path.join('text_output', text_filename) | |
with open(text_path, 'w', encoding='utf-8') as f: | |
f.write(text_content) | |
text_analysis = analyze_text_content(text_content) | |
else: | |
text_filename = None | |
text_analysis = None | |
else: | |
text_content = '' | |
text_filename = None | |
text_analysis = None | |
batch_size = 15 | |
for i in range(0, len(frames), batch_size): | |
batch_frames = frames[i:i + batch_size] | |
result = analyze_batch(batch_frames, text_content) | |
if result is None: | |
continue | |
results.extend(result) | |
# Cleanup frames | |
for frame_data in batch_frames: | |
if frame_data.get('is_inappropriate', False) or frame_data.get('is_harmful', False): | |
unique_filename = f'unsafe_{uuid.uuid4().hex}.png' | |
unsafe_frame_path = os.path.join('unsafe_frames', unique_filename) | |
os.rename(frame_data['frame'], unsafe_frame_path) | |
else: | |
os.remove(frame_data['frame']) | |
os.remove(frame_data['thumbnail']) | |
if os.path.exists(video_path): | |
os.remove(video_path) | |
if results: | |
total_meta_score = sum(r['meta_standards']['score'] for r in results) / len(results) | |
overall_assessment = { | |
"total_score": total_meta_score, | |
"risk_level": "High" if total_meta_score > 35 else "Medium" if total_meta_score > 30 else "Low", | |
"recommendation": get_recommendation(total_meta_score) | |
} | |
else: | |
overall_assessment = { | |
"total_score": 0, | |
"risk_level": "Low", | |
"recommendation": "No issues detected" | |
} | |
return jsonify({ | |
"success": True, | |
"results": results, | |
"audio_path": audio_filename, | |
"audio_text": text_content, | |
"text_file": text_filename, | |
"text_analysis": text_analysis, | |
"overall_assessment": overall_assessment | |
}) | |
except Exception as e: | |
if os.path.exists(video_path): | |
os.remove(video_path) | |
logger.error(f"Error in content analysis: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
except Exception as e: | |
logger.error(f"Error in upload: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
def extract_frames(video_path): | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Error opening video file") | |
frames = [] | |
frame_count = 0 | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_count % fps == 0: | |
frame_path = os.path.join('temp', f'frame_{frame_count}.jpg') | |
thumbnail_path = os.path.join('temp', f'thumb_{frame_count}.jpg') | |
cv2.imwrite(frame_path, frame) | |
thumbnail = cv2.resize(frame, (648, 648)) | |
cv2.imwrite(thumbnail_path, thumbnail) | |
frames.append({ | |
'frame': frame_path, | |
'thumbnail': thumbnail_path, | |
'timestamp': frame_count // fps | |
}) | |
frame_count += 1 | |
cap.release() | |
return frames | |
def extract_audio(video_path, output_path): | |
try: | |
command = [ | |
'ffmpeg', | |
'-i', video_path, | |
'-vn', | |
'-acodec', 'pcm_s16le', | |
'-ar', '16000', | |
'-ac', '1', | |
'-y', | |
output_path | |
] | |
result = subprocess.run( | |
command, | |
check=True, | |
stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE | |
) | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
logger.info(f"Audio extracted successfully: {output_path}") | |
return output_path | |
else: | |
raise Exception("Audio extraction failed - empty or missing file") | |
except Exception as e: | |
logger.error(f"Audio extraction error: {str(e)}") | |
return None | |
def process_audio(audio_path): | |
try: | |
if not os.path.exists(audio_path): | |
logger.error(f"Audio file not found: {audio_path}") | |
return { | |
'success': False, | |
'text': "Audio file not found", | |
'error': "File not found" | |
} | |
logger.info(f"Processing audio file: {audio_path}") | |
# First pass with Whisper | |
whisper_result = whisper_model(audio_path) | |
logger.info(f"Whisper result: {whisper_result}") | |
if not whisper_result.get('text'): | |
logger.error("Whisper failed to extract text") | |
return { | |
'success': False, | |
'text': "Whisper failed to extract text", | |
'error': "No text found in Whisper output" | |
} | |
text = whisper_result['text'] | |
# Second pass with BERT | |
chunks = [text[i:i+512] for i in range(0, len(text), 512)] | |
processed_chunks = [] | |
for chunk in chunks: | |
inputs = bert_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) | |
with torch.no_grad(): | |
outputs = bert_model(**inputs) | |
processed_chunk = bert_tokenizer.decode( | |
inputs['input_ids'][0], | |
skip_special_tokens=True | |
) | |
processed_chunks.append(processed_chunk) | |
final_text = " ".join(processed_chunks) | |
return { | |
'success': True, | |
'text': final_text, | |
'confidence': whisper_result.get('confidence', 0) | |
} | |
except Exception as e: | |
logger.error(f"Audio processing error: {str(e)}") | |
return { | |
'success': False, | |
'text': "Audio processing failed", | |
'error': str(e) | |
} | |
def analyze_text_content(text): | |
try: | |
# Analyze profanity | |
profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
profanity_outputs = profanity_model(**profanity_inputs) | |
profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) | |
# Analyze hate speech | |
hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
hate_speech_outputs = hate_speech_model(**hate_speech_inputs) | |
hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) | |
return { | |
"profanity": { | |
"score": float(profanity_scores[0][1]) * 100, | |
"is_offensive": float(profanity_scores[0][1]) > 0.5 | |
}, | |
"hate_speech": { | |
"score": float(hate_speech_scores[0][1]) * 100, | |
"is_hateful": float(hate_speech_scores[0][1]) > 0.5 | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error analyzing text: {str(e)}") | |
return None | |
def analyze_batch(batch_frames, text): | |
try: | |
results = [] | |
images = [] | |
timestamps = [] | |
for frame_data in batch_frames: | |
image = Image.open(frame_data['frame']) | |
image = image.resize((128, 128)) | |
images.append(image) | |
timestamps.append(frame_data['timestamp']) | |
# Prepare image data | |
image_arrays = np.array([np.array(img) / 255.0 for img in images]) | |
image_tensors = torch.tensor(image_arrays).permute(0, 3, 1, 2).float() | |
# Run analyses | |
with torch.no_grad(): | |
nudity_results = nudity_model(image_tensors) | |
nudity_predictions = [result.boxes for result in nudity_results] | |
if text: | |
profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
profanity_outputs = profanity_model(**profanity_inputs) | |
profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) | |
hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
hate_speech_outputs = hate_speech_model(**hate_speech_inputs) | |
hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) | |
else: | |
profanity_scores = torch.zeros(1, 2) | |
hate_speech_scores = torch.zeros(1, 2) | |
clip_inputs = clip_processor(text=[text if text else ""] * len(images), images=images, return_tensors="tf", padding=True) | |
clip_outputs = clip_model(**clip_inputs) | |
clip_scores = tf.nn.softmax(clip_outputs.logits_per_image, axis=-1).numpy() | |
for i in range(len(batch_frames)): | |
with open(batch_frames[i]['thumbnail'], 'rb') as img_file: | |
thumbnail_base64 = base64.b64encode(img_file.read()).decode('utf-8') | |
nudity_score = float(nudity_predictions[i].conf[0]) * 99 if nudity_predictions[i] else 0.0 | |
profanity_score = float(profanity_scores[0][1]) * 99 | |
hate_speech_score = float(hate_speech_scores[0][1]) * 99 | |
harm_score = float(clip_scores[i][1]) * 99 if clip_scores[i].size > 1 else 0.0 | |
meta_standards_score = ( | |
(nudity_score * 0.6) + | |
(hate_speech_score * 0.1) + | |
(harm_score * 0.1) + | |
(profanity_score * 0.1) | |
) | |
results.append({ | |
"nudity": { | |
"score": nudity_score, | |
"is_inappropriate": nudity_score > 65 | |
}, | |
"profanity": { | |
"score": profanity_score, | |
"is_offensive": profanity_score > 65 | |
}, | |
"hate_speech": { | |
"score": hate_speech_score, | |
"is_hateful": hate_speech_score > 40 | |
}, | |
"harm": { | |
"score": harm_score, | |
"is_harmful": harm_score > 40 | |
}, | |
"meta_standards": { | |
"score": meta_standards_score, | |
"is_violating": meta_standards_score > 30, | |
"risk_level": "High" if meta_standards_score > 60 else "Medium" if meta_standards_score > 25 else "Low", | |
"recommendation": get_recommendation(meta_standards_score) | |
}, | |
"thumbnail": thumbnail_base64, | |
"timestamp": timestamps[i] | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"Error in batch analysis: {str(e)}") | |
return None | |
def get_recommendation(score): | |
if score > 70: | |
return "Content likely violates Meta Community Standards. Major modifications needed." | |
elif score > 30: | |
return "Content may need modifications to comply with Meta Community Standards." | |
else: | |
return "Content likely complies with Meta Community Standards." | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=5000, debug=True) |