MARIEL_PROJECT / app.py
angeloqq's picture
TEST
806c931
from flask import Flask, request, jsonify, render_template, send_from_directory
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
TFCLIPModel,
CLIPProcessor,
pipeline,
BertTokenizer,
BertForSequenceClassification
)
import cv2
import os
import subprocess
import torch
from PIL import Image
import numpy as np
import base64
import uuid
from ultralytics import YOLO
import tensorflow as tf
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Create directories
os.makedirs('save', exist_ok=True)
os.makedirs('temp', exist_ok=True)
os.makedirs('unsafe_frames', exist_ok=True)
os.makedirs('audio', exist_ok=True)
os.makedirs('logs', exist_ok=True)
os.makedirs('text_output', exist_ok=True)
print("Loading models...")
try:
# Load models
nudity_model = YOLO("Models/nudenet/320n.pt")
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertForSequenceClassification.from_pretrained('bert-base-uncased')
profanity_model = AutoModelForSequenceClassification.from_pretrained("unitary/toxic-bert")
profanity_tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert")
hate_speech_model = AutoModelForSequenceClassification.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english")
hate_speech_tokenizer = AutoTokenizer.from_pretrained("Hate-speech-CNERG/dehatebert-mono-english")
clip_model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
whisper_model = pipeline("automatic-speech-recognition", model="openai/whisper-tiny")
print("All models loaded successfully")
except Exception as e:
logger.error(f"Error loading models: {str(e)}")
raise
@app.route("/")
def home():
return render_template('index.html')
@app.route("/extract_text", methods=["POST"])
def extract_text():
try:
audio_file = request.form.get('audio_file')
if not audio_file:
return jsonify({"error": "No audio file specified"}), 400
audio_path = os.path.join('audio', audio_file)
if not os.path.exists(audio_path):
return jsonify({"error": "Audio file not found"}), 404
# Process audio and get text
audio_result = process_audio(audio_path)
if not audio_result['success']:
return jsonify({"error": audio_result['error']}), 500
# Save extracted text
text_filename = f"text_{uuid.uuid4().hex}.txt"
text_path = os.path.join('text_output', text_filename)
with open(text_path, 'w', encoding='utf-8') as f:
f.write(audio_result['text'])
# Analyze text content
text_analysis = analyze_text_content(audio_result['text'])
return jsonify({
"success": True,
"text": audio_result['text'],
"text_file": text_filename,
"confidence": audio_result['confidence'],
"analysis": text_analysis
})
except Exception as e:
logger.error(f"Error extracting text: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/audio/<path:filename>')
def serve_audio(filename):
return send_from_directory('audio', filename)
@app.route("/upload", methods=["POST"])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
video = request.files['file']
if video.filename == '':
return jsonify({"error": "No file selected"}), 400
video_path = os.path.join('save', video.filename)
video.save(video_path)
try:
frames = extract_frames(video_path)
results = []
audio_filename = f"audio_{uuid.uuid4().hex}.wav"
audio_path = os.path.join('audio', audio_filename)
audio_result = extract_audio(video_path, audio_path)
if audio_result:
audio_text = process_audio(audio_path)
text_content = audio_text.get('text', '')
# Save extracted text
if text_content:
text_filename = f"text_{uuid.uuid4().hex}.txt"
text_path = os.path.join('text_output', text_filename)
with open(text_path, 'w', encoding='utf-8') as f:
f.write(text_content)
text_analysis = analyze_text_content(text_content)
else:
text_filename = None
text_analysis = None
else:
text_content = ''
text_filename = None
text_analysis = None
batch_size = 15
for i in range(0, len(frames), batch_size):
batch_frames = frames[i:i + batch_size]
result = analyze_batch(batch_frames, text_content)
if result is None:
continue
results.extend(result)
# Cleanup frames
for frame_data in batch_frames:
if frame_data.get('is_inappropriate', False) or frame_data.get('is_harmful', False):
unique_filename = f'unsafe_{uuid.uuid4().hex}.png'
unsafe_frame_path = os.path.join('unsafe_frames', unique_filename)
os.rename(frame_data['frame'], unsafe_frame_path)
else:
os.remove(frame_data['frame'])
os.remove(frame_data['thumbnail'])
if os.path.exists(video_path):
os.remove(video_path)
if results:
total_meta_score = sum(r['meta_standards']['score'] for r in results) / len(results)
overall_assessment = {
"total_score": total_meta_score,
"risk_level": "High" if total_meta_score > 35 else "Medium" if total_meta_score > 30 else "Low",
"recommendation": get_recommendation(total_meta_score)
}
else:
overall_assessment = {
"total_score": 0,
"risk_level": "Low",
"recommendation": "No issues detected"
}
return jsonify({
"success": True,
"results": results,
"audio_path": audio_filename,
"audio_text": text_content,
"text_file": text_filename,
"text_analysis": text_analysis,
"overall_assessment": overall_assessment
})
except Exception as e:
if os.path.exists(video_path):
os.remove(video_path)
logger.error(f"Error in content analysis: {str(e)}")
return jsonify({"error": str(e)}), 500
except Exception as e:
logger.error(f"Error in upload: {str(e)}")
return jsonify({"error": str(e)}), 500
def extract_frames(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise Exception("Error opening video file")
frames = []
frame_count = 0
fps = int(cap.get(cv2.CAP_PROP_FPS))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % fps == 0:
frame_path = os.path.join('temp', f'frame_{frame_count}.jpg')
thumbnail_path = os.path.join('temp', f'thumb_{frame_count}.jpg')
cv2.imwrite(frame_path, frame)
thumbnail = cv2.resize(frame, (648, 648))
cv2.imwrite(thumbnail_path, thumbnail)
frames.append({
'frame': frame_path,
'thumbnail': thumbnail_path,
'timestamp': frame_count // fps
})
frame_count += 1
cap.release()
return frames
def extract_audio(video_path, output_path):
try:
command = [
'ffmpeg',
'-i', video_path,
'-vn',
'-acodec', 'pcm_s16le',
'-ar', '16000',
'-ac', '1',
'-y',
output_path
]
result = subprocess.run(
command,
check=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE
)
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
logger.info(f"Audio extracted successfully: {output_path}")
return output_path
else:
raise Exception("Audio extraction failed - empty or missing file")
except Exception as e:
logger.error(f"Audio extraction error: {str(e)}")
return None
def process_audio(audio_path):
try:
if not os.path.exists(audio_path):
logger.error(f"Audio file not found: {audio_path}")
return {
'success': False,
'text': "Audio file not found",
'error': "File not found"
}
logger.info(f"Processing audio file: {audio_path}")
# First pass with Whisper
whisper_result = whisper_model(audio_path)
logger.info(f"Whisper result: {whisper_result}")
if not whisper_result.get('text'):
logger.error("Whisper failed to extract text")
return {
'success': False,
'text': "Whisper failed to extract text",
'error': "No text found in Whisper output"
}
text = whisper_result['text']
# Second pass with BERT
chunks = [text[i:i+512] for i in range(0, len(text), 512)]
processed_chunks = []
for chunk in chunks:
inputs = bert_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = bert_model(**inputs)
processed_chunk = bert_tokenizer.decode(
inputs['input_ids'][0],
skip_special_tokens=True
)
processed_chunks.append(processed_chunk)
final_text = " ".join(processed_chunks)
return {
'success': True,
'text': final_text,
'confidence': whisper_result.get('confidence', 0)
}
except Exception as e:
logger.error(f"Audio processing error: {str(e)}")
return {
'success': False,
'text': "Audio processing failed",
'error': str(e)
}
def analyze_text_content(text):
try:
# Analyze profanity
profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
profanity_outputs = profanity_model(**profanity_inputs)
profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1)
# Analyze hate speech
hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
hate_speech_outputs = hate_speech_model(**hate_speech_inputs)
hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1)
return {
"profanity": {
"score": float(profanity_scores[0][1]) * 100,
"is_offensive": float(profanity_scores[0][1]) > 0.5
},
"hate_speech": {
"score": float(hate_speech_scores[0][1]) * 100,
"is_hateful": float(hate_speech_scores[0][1]) > 0.5
}
}
except Exception as e:
logger.error(f"Error analyzing text: {str(e)}")
return None
def analyze_batch(batch_frames, text):
try:
results = []
images = []
timestamps = []
for frame_data in batch_frames:
image = Image.open(frame_data['frame'])
image = image.resize((128, 128))
images.append(image)
timestamps.append(frame_data['timestamp'])
# Prepare image data
image_arrays = np.array([np.array(img) / 255.0 for img in images])
image_tensors = torch.tensor(image_arrays).permute(0, 3, 1, 2).float()
# Run analyses
with torch.no_grad():
nudity_results = nudity_model(image_tensors)
nudity_predictions = [result.boxes for result in nudity_results]
if text:
profanity_inputs = profanity_tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
profanity_outputs = profanity_model(**profanity_inputs)
profanity_scores = torch.nn.functional.softmax(profanity_outputs.logits, dim=-1)
hate_speech_inputs = hate_speech_tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
hate_speech_outputs = hate_speech_model(**hate_speech_inputs)
hate_speech_scores = torch.nn.functional.softmax(hate_speech_outputs.logits, dim=-1)
else:
profanity_scores = torch.zeros(1, 2)
hate_speech_scores = torch.zeros(1, 2)
clip_inputs = clip_processor(text=[text if text else ""] * len(images), images=images, return_tensors="tf", padding=True)
clip_outputs = clip_model(**clip_inputs)
clip_scores = tf.nn.softmax(clip_outputs.logits_per_image, axis=-1).numpy()
for i in range(len(batch_frames)):
with open(batch_frames[i]['thumbnail'], 'rb') as img_file:
thumbnail_base64 = base64.b64encode(img_file.read()).decode('utf-8')
nudity_score = float(nudity_predictions[i].conf[0]) * 99 if nudity_predictions[i] else 0.0
profanity_score = float(profanity_scores[0][1]) * 99
hate_speech_score = float(hate_speech_scores[0][1]) * 99
harm_score = float(clip_scores[i][1]) * 99 if clip_scores[i].size > 1 else 0.0
meta_standards_score = (
(nudity_score * 0.6) +
(hate_speech_score * 0.1) +
(harm_score * 0.1) +
(profanity_score * 0.1)
)
results.append({
"nudity": {
"score": nudity_score,
"is_inappropriate": nudity_score > 65
},
"profanity": {
"score": profanity_score,
"is_offensive": profanity_score > 65
},
"hate_speech": {
"score": hate_speech_score,
"is_hateful": hate_speech_score > 40
},
"harm": {
"score": harm_score,
"is_harmful": harm_score > 40
},
"meta_standards": {
"score": meta_standards_score,
"is_violating": meta_standards_score > 30,
"risk_level": "High" if meta_standards_score > 60 else "Medium" if meta_standards_score > 25 else "Low",
"recommendation": get_recommendation(meta_standards_score)
},
"thumbnail": thumbnail_base64,
"timestamp": timestamps[i]
})
return results
except Exception as e:
logger.error(f"Error in batch analysis: {str(e)}")
return None
def get_recommendation(score):
if score > 70:
return "Content likely violates Meta Community Standards. Major modifications needed."
elif score > 30:
return "Content may need modifications to comply with Meta Community Standards."
else:
return "Content likely complies with Meta Community Standards."
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)