|
import gradio as gr |
|
import pandas as pd |
|
import cv2 |
|
import torch |
|
import tempfile |
|
import os |
|
import librosa |
|
from fer import FER |
|
from transformers import AutoModelForAudioClassification, pipeline |
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
import numpy as np |
|
from torch.nn.functional import softmax |
|
import whisper_timestamped as whisper |
|
from translate import Translator |
|
|
|
|
|
audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True) |
|
face_detector = FER(mtcnn=True) |
|
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None) |
|
|
|
|
|
mean = audio_model.config.mean |
|
std = audio_model.config.std |
|
|
|
|
|
def extract_audio_from_video(video_path): |
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: |
|
video_clip = VideoFileClip(video_path) |
|
audio_clip = video_clip.audio |
|
audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le") |
|
return temp_audio_file.name |
|
|
|
|
|
def process_audio_and_detect_emotions(audio_clip): |
|
audio_np = np.array(audio_clip) |
|
mask = torch.ones(1, len(audio_np)) |
|
wavs = torch.tensor(audio_np).unsqueeze(0) |
|
|
|
with torch.no_grad(): |
|
pred = audio_model(wavs, mask) |
|
logits = pred.logits if hasattr(pred, 'logits') else pred[0] |
|
labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} |
|
probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] |
|
probabilities = probabilities / probabilities.sum() |
|
df = pd.DataFrame([probabilities.numpy()], columns=labels.values()) |
|
return df |
|
|
|
|
|
def analyze_audio_emotions(video_path): |
|
temp_audio_path = None |
|
try: |
|
temp_audio_path = extract_audio_from_video(video_path) |
|
raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate) |
|
norm_wav = (raw_wav - mean) / (std + 0.000001) |
|
|
|
times = [] |
|
emotions_dfs = [] |
|
for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate): |
|
audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate] |
|
df = process_audio_and_detect_emotions(audio_segment) |
|
times.append(start_time / audio_model.config.sampling_rate) |
|
emotions_dfs.append(df) |
|
|
|
emotions_df = pd.concat(emotions_dfs, ignore_index=True) |
|
emotions_df.insert(0, "Time(s)", times) |
|
emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} |
|
emotions_df.rename(columns=emotion_rename_map, inplace=True) |
|
|
|
emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name |
|
emotions_df.to_excel(emotions_xlsx_path, index=False) |
|
|
|
return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path |
|
|
|
except Exception as e: |
|
return f"Error during audio emotion detection: {str(e)}", None, None |
|
finally: |
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
|
|
|
|
def detect_faces_and_emotions(video_path): |
|
temp_video_path = None |
|
temp_audio_path = None |
|
output_video_path = None |
|
emotions_data = [] |
|
try: |
|
temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
|
temp_video_path = temp_video.name |
|
temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) |
|
temp_audio_path = temp_audio.name |
|
output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) |
|
output_xlsx_path = output_xlsx.name |
|
|
|
original_video = VideoFileClip(video_path) |
|
original_audio = original_video.audio |
|
original_audio.write_audiofile(temp_audio_path) |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
raise Exception("Error: Could not open video file.") |
|
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height)) |
|
|
|
frame_number = 0 |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
if frame is None: |
|
continue |
|
|
|
time_seconds = round(frame_number / fps) |
|
result = face_detector.detect_emotions(frame) |
|
|
|
for face in result: |
|
bounding_box = face["box"] |
|
emotions = face["emotions"] |
|
emotions["Time(s)"] = time_seconds |
|
emotions_data.append(emotions) |
|
cv2.rectangle(frame, (bounding_box[0], bounding_box[1]), |
|
(bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2) |
|
for index, (emotion_name, score) in enumerate(emotions.items()): |
|
color = (211, 211, 211) if score < 0.01 else (255, 0, 0) |
|
emotion_score = "{}: {:.2f}".format(emotion_name, score) |
|
cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) |
|
|
|
out.write(frame) |
|
frame_number += 1 |
|
|
|
cap.release() |
|
out.release() |
|
|
|
emotions_df = pd.DataFrame(emotions_data) |
|
emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) |
|
max_time = emotions_df['Time(s)'].max() |
|
all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) |
|
avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() |
|
df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left') |
|
df_merged.fillna(0, inplace=True) |
|
df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" |
|
df_merged.to_excel(output_xlsx_path, index=False) |
|
|
|
processed_video = VideoFileClip(temp_video_path) |
|
audio = AudioFileClip(temp_audio_path) |
|
final_video = processed_video.set_audio(audio) |
|
output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
|
output_video_path = output_video.name |
|
final_video.write_videofile(output_video_path, codec='libx264') |
|
|
|
return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path |
|
|
|
except Exception as e: |
|
return f"Error during processing: {str(e)}", None, None, None |
|
finally: |
|
if temp_video_path and os.path.exists(temp_video_path): |
|
os.remove(temp_video_path) |
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
|
|
|
|
def process_video_text(video_path): |
|
temp_audio_path = None |
|
try: |
|
video_clip = VideoFileClip(video_path) |
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: |
|
temp_audio_path = temp_audio_file.name |
|
video_clip.audio.write_audiofile(temp_audio_path) |
|
|
|
audio = whisper.load_audio(temp_audio_path) |
|
model = whisper.load_model("medium", device="cpu") |
|
result = whisper.transcribe(model, audio) |
|
|
|
|
|
word_texts = [] |
|
word_starts = [] |
|
word_ends = [] |
|
word_confidences = [] |
|
|
|
for segment in result['segments']: |
|
for word in segment['words']: |
|
word_texts.append(word['text']) |
|
word_starts.append(word['start']) |
|
word_ends.append(word['end']) |
|
word_confidences.append(word['confidence']) |
|
|
|
|
|
segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']] |
|
segments_df = pd.DataFrame(segments_data) |
|
|
|
|
|
translator = Translator(from_lang='ko', to_lang='en') |
|
segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x)) |
|
|
|
|
|
segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]}) |
|
|
|
|
|
sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) |
|
sentiment_df = pd.concat([segments_df, sentiment_df], axis=1) |
|
|
|
|
|
words_data = { |
|
'text': word_texts, |
|
'start': word_starts, |
|
'end': word_ends, |
|
'confidence': word_confidences |
|
} |
|
words_df = pd.DataFrame(words_data) |
|
|
|
|
|
words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) |
|
|
|
|
|
words_grouped = words_df.groupby('second').agg({ |
|
'text': lambda x: ' '.join(x), |
|
'start': 'min', |
|
'end': 'max', |
|
'confidence': 'mean' |
|
}).reset_index() |
|
|
|
|
|
max_second = int(video_clip.duration) |
|
all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) |
|
words_grouped = all_seconds.merge(words_grouped, on='second', how='left') |
|
|
|
|
|
words_grouped['text'].fillna('', inplace=True) |
|
words_grouped.fillna(0, inplace=True) |
|
|
|
|
|
emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores']) |
|
for col in emotion_columns: |
|
words_grouped[col] = np.nan |
|
|
|
|
|
for i, row in words_grouped.iterrows(): |
|
matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])] |
|
if not matching_segment.empty: |
|
for emotion in emotion_columns: |
|
words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] |
|
|
|
|
|
words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) |
|
|
|
|
|
segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name |
|
words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name |
|
sentiment_df.to_excel(segments_xlsx_path, index=False) |
|
words_grouped.to_excel(words_xlsx_path, index=False) |
|
|
|
return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!" |
|
|
|
except Exception as e: |
|
return None, None, None, None, f"Error during text emotion processing: {str(e)}" |
|
finally: |
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
|
|
|
|
def gradio_app(): |
|
interface = gr.Blocks() |
|
|
|
with interface: |
|
gr.Markdown("## I-MEQ: Emotion Monitoring System") |
|
video_input = gr.Video(label="Upload your video for analysis", height=600) |
|
|
|
with gr.Row(): |
|
analyze_audio_button = gr.Button("Analyze Audio Emotions") |
|
analyze_fer_button = gr.Button("Analyze Facial Emotions") |
|
analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status") |
|
audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False) |
|
audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX") |
|
|
|
with gr.Column(): |
|
fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status") |
|
fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False) |
|
fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX") |
|
processed_video_download = gr.File(label="Download Processed Video") |
|
|
|
with gr.Column(): |
|
text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status") |
|
words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False) |
|
segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False) |
|
words_xlsx_download = gr.File(label="Download Words XLSX") |
|
segments_xlsx_download = gr.File(label="Download Segments XLSX") |
|
|
|
analyze_audio_button.click( |
|
analyze_audio_emotions, |
|
inputs=video_input, |
|
outputs=[ |
|
audio_analysis_status, |
|
audio_emotions_dataframe, |
|
audio_emotions_xlsx_download |
|
] |
|
) |
|
|
|
analyze_fer_button.click( |
|
detect_faces_and_emotions, |
|
inputs=video_input, |
|
outputs=[ |
|
fer_analysis_status, |
|
fer_emotions_dataframe, |
|
fer_emotions_xlsx_download, |
|
processed_video_download |
|
] |
|
) |
|
|
|
analyze_text_button.click( |
|
process_video_text, |
|
inputs=video_input, |
|
outputs=[ |
|
words_dataframe, |
|
segments_dataframe, |
|
words_xlsx_download, |
|
segments_xlsx_download, |
|
text_analysis_status |
|
] |
|
) |
|
|
|
interface.launch() |
|
|
|
|
|
gradio_app() |