from flask import Flask, request, jsonify, render_template, send_from_directory from transformers import ( AutoModelForSequenceClassification, AutoTokenizer, TFCLIPModel, CLIPProcessor, pipeline, BertTokenizer, BertForSequenceClassification ) import cv2 import os import subprocess import torch from PIL import Image import numpy as np import base64 import uuid from ultralytics import YOLO import tensorflow as tf import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) # Create directories os.makedirs('save', exist_ok=True) os.makedirs('temp', exist_ok=True) os.makedirs('unsafe_frames', exist_ok=True) os.makedirs('audio', exist_ok=True) os.makedirs('logs', exist_ok=True) os.makedirs('text_output', exist_ok=True) print("Loading models...") try: # Load models nudity_model = YOLO("Models/nudenet/320n.pt") bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') bert_model = BertForSequenceClassification.from_pretrained('bert-base-uncased') profanity_model = AutoModelForSequenceClassification.from_pretrained("unitary/toxic-bert") profanity_tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert") hate_speech_model = AutoModelForSequenceClassification.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") hate_speech_tokenizer = AutoTokenizer.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english") clip_model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") whisper_model = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") print("All models loaded successfully") except Exception as e: logger.error(f"Error loading models: {str(e)}") raise @app.route("/") def home(): return render_template('index.html') @app.route("/extract_text", methods=["POST"]) def extract_text(): try: audio_file = request.form.get('audio_file') if not audio_file: return jsonify({"error": "No audio file specified"}), 400 audio_path = os.path.join('audio', audio_file) if not os.path.exists(audio_path): return jsonify({"error": "Audio file not found"}), 404 # Process audio and get text audio_result = process_audio(audio_path) if not audio_result['success']: return jsonify({"error": audio_result['error']}), 500 # Save extracted text text_filename = f"text_{uuid.uuid4().hex}.txt" text_path = os.path.join('text_output', text_filename) with open(text_path, 'w', encoding='utf-8') as f: f.write(audio_result['text']) # Analyze text content text_analysis = analyze_text_content(audio_result['text']) return jsonify({ "success": True, "text": audio_result['text'], "text_file": text_filename, "confidence": audio_result['confidence'], "analysis": text_analysis }) except Exception as e: logger.error(f"Error extracting text: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/audio/') def serve_audio(filename): return send_from_directory('audio', filename) @app.route("/upload", methods=["POST"]) def upload_file(): try: if 'file' not in request.files: return jsonify({"error": "No file uploaded"}), 400 video = request.files['file'] if video.filename == '': return jsonify({"error": "No file selected"}), 400 video_path = os.path.join('save', video.filename) video.save(video_path) try: frames = extract_frames(video_path) results = [] audio_filename = f"audio_{uuid.uuid4().hex}.wav" audio_path = os.path.join('audio', audio_filename) audio_result = extract_audio(video_path, audio_path) if audio_result: audio_text = process_audio(audio_path) text_content = audio_text.get('text', '') # Save extracted text if text_content: text_filename = f"text_{uuid.uuid4().hex}.txt" text_path = os.path.join('text_output', text_filename) with open(text_path, 'w', encoding='utf-8') as f: f.write(text_content) text_analysis = analyze_text_content(text_content) else: text_filename = None text_analysis = None else: text_content = '' text_filename = None text_analysis = None batch_size = 15 for i in range(0, len(frames), batch_size): batch_frames = frames[i:i + batch_size] result = analyze_batch(batch_frames, text_content) if result is None: continue results.extend(result) # Cleanup frames for frame_data in batch_frames: if frame_data.get('is_inappropriate', False) or frame_data.get('is_harmful', False): unique_filename = f'unsafe_{uuid.uuid4().hex}.png' unsafe_frame_path = os.path.join('unsafe_frames', unique_filename) os.rename(frame_data['frame'], unsafe_frame_path) else: os.remove(frame_data['frame']) os.remove(frame_data['thumbnail']) if os.path.exists(video_path): os.remove(video_path) if results: total_meta_score = sum(r['meta_standards']['score'] for r in results) / len(results) overall_assessment = { "total_score": total_meta_score, "risk_level": "High" if total_meta_score > 35 else "Medium" if total_meta_score > 30 else "Low", "recommendation": get_recommendation(total_meta_score) } else: overall_assessment = { "total_score": 0, "risk_level": "Low", "recommendation": "No issues detected" } return jsonify({ "success": True, "results": results, "audio_path": audio_filename, "audio_text": text_content, "text_file": text_filename, "text_analysis": text_analysis, "overall_assessment": overall_assessment }) except Exception as e: if os.path.exists(video_path): os.remove(video_path) logger.error(f"Error in content analysis: {str(e)}") return jsonify({"error": str(e)}), 500 except Exception as e: logger.error(f"Error in upload: {str(e)}") return jsonify({"error": str(e)}), 500 def extract_frames(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Error opening video file") frames = [] frame_count = 0 fps = int(cap.get(cv2.CAP_PROP_FPS)) while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % fps == 0: frame_path = os.path.join('temp', f'frame_{frame_count}.jpg') thumbnail_path = os.path.join('temp', f'thumb_{frame_count}.jpg') cv2.imwrite(frame_path, frame) thumbnail = cv2.resize(frame, (648, 648)) cv2.imwrite(thumbnail_path, thumbnail) frames.append({ 'frame': frame_path, 'thumbnail': thumbnail_path, 'timestamp': frame_count // fps }) frame_count += 1 cap.release() return frames def extract_audio(video_path, output_path): try: command = [ 'ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', '-y', output_path ] result = subprocess.run( command, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE ) if os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"Audio extracted successfully: {output_path}") return output_path else: raise Exception("Audio extraction failed - empty or missing file") except Exception as e: logger.error(f"Audio extraction error: {str(e)}") return None def process_audio(audio_path): try: if not os.path.exists(audio_path): logger.error(f"Audio file not found: {audio_path}") return { 'success': False, 'text': "Audio file not found", 'error': "File not found" } logger.info(f"Processing audio file: {audio_path}") # First pass with Whisper whisper_result = whisper_model(audio_path) logger.info(f"Whisper result: {whisper_result}") if not whisper_result.get('text'): logger.error("Whisper failed to extract text") return { 'success': False, 'text': "Whisper failed to extract text", 'error': "No text found in Whisper output" } text = whisper_result['text'] # Second pass with BERT chunks = [text[i:i+512] for i in range(0, len(text), 512)] processed_chunks = [] for chunk in chunks: inputs = bert_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) with torch.no_grad(): outputs = bert_model(**inputs) processed_chunk = bert_tokenizer.decode( inputs['input_ids'][0], skip_special_tokens=True ) processed_chunks.append(processed_chunk) final_text = " ".join(processed_chunks) return { 'success': True, 'text': final_text, 'confidence': whisper_result.get('confidence', 0) } except Exception as e: logger.error(f"Audio processing error: {str(e)}") return { 'success': False, 'text': "Audio processing failed", 'error': str(e) } def analyze_text_content(text): try: # Analyze profanity profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): profanity_outputs = profanity_model(**profanity_inputs) profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) # Analyze hate speech hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): hate_speech_outputs = hate_speech_model(**hate_speech_inputs) hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) return { "profanity": { "score": float(profanity_scores[0][1]) * 100, "is_offensive": float(profanity_scores[0][1]) > 0.5 }, "hate_speech": { "score": float(hate_speech_scores[0][1]) * 100, "is_hateful": float(hate_speech_scores[0][1]) > 0.5 } } except Exception as e: logger.error(f"Error analyzing text: {str(e)}") return None def analyze_batch(batch_frames, text): try: results = [] images = [] timestamps = [] for frame_data in batch_frames: image = Image.open(frame_data['frame']) image = image.resize((128, 128)) images.append(image) timestamps.append(frame_data['timestamp']) # Prepare image data image_arrays = np.array([np.array(img) / 255.0 for img in images]) image_tensors = torch.tensor(image_arrays).permute(0, 3, 1, 2).float() # Run analyses with torch.no_grad(): nudity_results = nudity_model(image_tensors) nudity_predictions = [result.boxes for result in nudity_results] if text: profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): profanity_outputs = profanity_model(**profanity_inputs) profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1) hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): hate_speech_outputs = hate_speech_model(**hate_speech_inputs) hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1) else: profanity_scores = torch.zeros(1, 2) hate_speech_scores = torch.zeros(1, 2) clip_inputs = clip_processor(text=[text if text else ""] * len(images), images=images, return_tensors="tf", padding=True) clip_outputs = clip_model(**clip_inputs) clip_scores = tf.nn.softmax(clip_outputs.logits_per_image, axis=-1).numpy() for i in range(len(batch_frames)): with open(batch_frames[i]['thumbnail'], 'rb') as img_file: thumbnail_base64 = base64.b64encode(img_file.read()).decode('utf-8') nudity_score = float(nudity_predictions[i].conf[0]) * 99 if nudity_predictions[i] else 0.0 profanity_score = float(profanity_scores[0][1]) * 99 hate_speech_score = float(hate_speech_scores[0][1]) * 99 harm_score = float(clip_scores[i][1]) * 99 if clip_scores[i].size > 1 else 0.0 meta_standards_score = ( (nudity_score * 0.6) + (hate_speech_score * 0.1) + (harm_score * 0.1) + (profanity_score * 0.1) ) results.append({ "nudity": { "score": nudity_score, "is_inappropriate": nudity_score > 65 }, "profanity": { "score": profanity_score, "is_offensive": profanity_score > 65 }, "hate_speech": { "score": hate_speech_score, "is_hateful": hate_speech_score > 40 }, "harm": { "score": harm_score, "is_harmful": harm_score > 40 }, "meta_standards": { "score": meta_standards_score, "is_violating": meta_standards_score > 30, "risk_level": "High" if meta_standards_score > 60 else "Medium" if meta_standards_score > 25 else "Low", "recommendation": get_recommendation(meta_standards_score) }, "thumbnail": thumbnail_base64, "timestamp": timestamps[i] }) return results except Exception as e: logger.error(f"Error in batch analysis: {str(e)}") return None def get_recommendation(score): if score > 70: return "Content likely violates Meta Community Standards. Major modifications needed." elif score > 30: return "Content may need modifications to comply with Meta Community Standards." else: return "Content likely complies with Meta Community Standards." if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)