|
from pytubefix import YouTube |
|
from pytubefix.cli import on_progress |
|
import time |
|
import math |
|
import gradio as gr |
|
import ffmpeg |
|
from faster_whisper import WhisperModel |
|
import requests |
|
import json |
|
import arabic_reshaper |
|
from bidi.algorithm import get_display |
|
from moviepy import * |
|
import pysrt |
|
import instaloader |
|
import time |
|
import concurrent.futures |
|
import re |
|
from io import BytesIO |
|
from PIL import Image |
|
api_key = "268976:66f4f58a2a905" |
|
|
|
|
|
def extract_audio(input_video_name): |
|
|
|
mp3_file = "audio.mp3" |
|
|
|
|
|
video_clip = VideoFileClip(input_video_name) |
|
|
|
|
|
audio_clip = video_clip.audio |
|
|
|
|
|
audio_clip.write_audiofile(mp3_file) |
|
|
|
|
|
audio_clip.close() |
|
video_clip.close() |
|
|
|
print("Audio extraction successful!") |
|
return mp3_file |
|
|
|
def transcribe(audio, max_segment_duration=2.0): |
|
model = WhisperModel("tiny", device="cpu") |
|
segments, info = model.transcribe(audio, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1500), word_timestamps=True) |
|
segments = list(segments) |
|
wordlevel_info = [] |
|
for segment in segments: |
|
for word in segment.words: |
|
print("[%.2fs -> %.2fs] %s" % (word.start, word.end, word.word)) |
|
wordlevel_info.append({'word':word.word,'start':word.start,'end':word.end}) |
|
return wordlevel_info |
|
|
|
def create_subtitles(wordlevel_info): |
|
punctuation_marks = {'.', '!', '?', ',', ';', ':', '—', '-', '。', '!', '?'} |
|
subtitles = [] |
|
line = [] |
|
|
|
for word_data in wordlevel_info: |
|
line.append(word_data) |
|
current_word = word_data['word'] |
|
|
|
|
|
ends_with_punct = current_word and (current_word[-1] in punctuation_marks) |
|
|
|
if ends_with_punct or len(line) == 5: |
|
|
|
subtitle = { |
|
"word": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"textcontents": line.copy() |
|
} |
|
subtitles.append(subtitle) |
|
line = [] |
|
|
|
|
|
if line: |
|
subtitle = { |
|
"word": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"textcontents": line.copy() |
|
} |
|
subtitles.append(subtitle) |
|
|
|
|
|
for i in range(1, len(subtitles)): |
|
prev_subtitle = subtitles[i - 1] |
|
current_subtitle = subtitles[i] |
|
|
|
|
|
prev_subtitle["end"] = current_subtitle["start"] |
|
|
|
return subtitles |
|
|
|
def format_time(seconds): |
|
hours = math.floor(seconds / 3600) |
|
seconds %= 3600 |
|
minutes = math.floor(seconds / 60) |
|
seconds %= 60 |
|
milliseconds = round((seconds - math.floor(seconds)) * 1000) |
|
seconds = math.floor(seconds) |
|
formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}" |
|
return formatted_time |
|
|
|
def generate_subtitle_file(language, segments, input_video_name): |
|
subtitle_file = f"sub-{input_video_name}.{language}.srt" |
|
text = "" |
|
for index, segment in enumerate(segments): |
|
segment_start = format_time(segment['start']) |
|
segment_end = format_time(segment['end']) |
|
text += f"{str(index+1)} \n" |
|
text += f"{segment_start} --> {segment_end} \n" |
|
text += f"{segment['word']} \n" |
|
text += "\n" |
|
f = open(subtitle_file, "w", encoding='utf8') |
|
f.write(text) |
|
f.close() |
|
return subtitle_file |
|
|
|
def clean_text(text): |
|
|
|
|
|
text = re.sub(r"^```|```$", '', text) |
|
text = re.sub(r'^srt', '', text, flags=re.MULTILINE) |
|
return text |
|
|
|
def translate_text(api_key, text, source_language = "en", target_language = "fa"): |
|
url = "https://api.one-api.ir/translate/v1/google/" |
|
request_body = {"source": source_lang, "target": target_lang, "text": text} |
|
headers = {"one-api-token": api_key, "Content-Type": "application/json"} |
|
response = requests.post(url, headers=headers, json=request_body) |
|
if response.status_code == 200: |
|
result = response.json() |
|
return result['result'] |
|
else: |
|
print(f"Error: {response.status_code}, {response.text}") |
|
return None |
|
|
|
def enhance_text(api_key, text): |
|
url = "https://api.one-api.ir/chatbot/v1/gpt4o/" |
|
|
|
|
|
request_body = [{ |
|
"role": "user", |
|
"content": f"Please take the following SRT subtitle text in English and translate only the subtitle text into Persian. Ensure that all numbering and time codes remain unchanged. The output should be a new SRT file with the subtitles in Persian, preserving the original formatting and timings and exept for the subtitle dont return anything in response. the subtitle will be provided in the following message" |
|
}, |
|
{ |
|
"role": "assistant", |
|
"content": "okay" |
|
}, |
|
{ |
|
"role": "user", |
|
"content": text |
|
} |
|
] |
|
|
|
|
|
headers = { |
|
"one-api-token": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
attempts = 0 |
|
max_attempts = 3 |
|
|
|
while attempts < max_attempts: |
|
response = requests.post(url, headers=headers, json=request_body) |
|
if response.status_code == 200: |
|
result = response.json() |
|
if result["status"] == 200: |
|
print("status: ", result["status"]) |
|
te = clean_text(result["result"][0]) |
|
print("result: ", te) |
|
return te |
|
else: |
|
print(f"Error: status {result['status']}, retrying in 30 seconds...") |
|
else: |
|
print(f"Error: {response.status_code}, {response.text}, retrying in 30 seconds...") |
|
attempts += 1 |
|
time.sleep(30) |
|
print("Error Max attempts reached. Could not retrieve a successful response.") |
|
te = translate_text(api_key, text) |
|
return te |
|
|
|
def read_srt_file(file_path): |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
srt_content = file.read() |
|
return srt_content |
|
except FileNotFoundError: |
|
print(f"The file {file_path} was not found.") |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
|
|
def write_srt(subtitle_text, output_file="edited_srt.srt"): |
|
with open(output_file, 'w', encoding="utf-8") as file: |
|
file.write(subtitle_text) |
|
|
|
def write_google(google_translate): |
|
google = "google_translate.srt" |
|
with open(google, 'w', encoding="utf-8") as f: |
|
f.write(google_translate) |
|
return google |
|
|
|
def generate_translated_subtitle(language, segments, input_video_name): |
|
input_video_name=input_video_name.split('/')[-1] |
|
subtitle_file = f"{input_video_name}.srt" |
|
text = "" |
|
lines = segments.split('\n') |
|
new_list = [item for item in lines if item != ''] |
|
segment_number = 1 |
|
|
|
for index, segment in enumerate(new_list): |
|
if (index+1) % 3 == 1 or (index+1)==1: |
|
text += f"{segment}\n" |
|
segment_number += 1 |
|
if (index+1) % 3 == 2 or (index+1)==2: |
|
text += segment + "\n" |
|
if (index+1) % 3 == 0: |
|
text += f"\u200F{segment}\n\n" |
|
|
|
with open(subtitle_file, "w", encoding='utf8') as f: |
|
f.write(text) |
|
return subtitle_file |
|
|
|
def time_to_seconds(time_obj): |
|
return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000 |
|
|
|
def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug): |
|
subtitle_clips = [] |
|
color_clips=[] |
|
for subtitle in subtitles: |
|
start_time = time_to_seconds(subtitle.start) |
|
end_time = time_to_seconds(subtitle.end) |
|
duration = end_time - start_time |
|
video_width, video_height = videosize |
|
max_width = video_width * 0.8 |
|
max_height = video_height * 0.2 |
|
|
|
|
|
text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="right" ,color=color, method='caption').with_start(start_time).with_duration(duration) |
|
myclip = ColorClip(size=(int(video_width * 0.8), int(video_height * 0.2)) , color=(225, 0, 0)).with_opacity(0.2).with_start(start_time).with_duration(duration) |
|
subtitle_x_position = 'center' |
|
subtitle_y_position = video_height * 0.68 |
|
text_position = (subtitle_x_position, subtitle_y_position) |
|
subtitle_clips.append(text_clip.with_position(text_position)) |
|
color_clips.append(myclip.with_position(text_position)) |
|
return subtitle_clips, color_clips |
|
|
|
def video_edit(srt, input_video, input_audio= "audio.mp3"): |
|
input_video_name = "video" |
|
video = VideoFileClip(input_video) |
|
audio = AudioFileClip(input_audio) |
|
video = video.with_audio(audio) |
|
print(video) |
|
output_video_file = input_video_name + '_subtitled' + ".mp4" |
|
|
|
subtitles = pysrt.open("video_subtitled.srt", encoding="utf-8") |
|
subtitle_clips, color_clips = create_subtitle_clips(subtitles, video.size, 24, 'arial.ttf', 'white', False) |
|
final_video = CompositeVideoClip([video]+color_clips + subtitle_clips) |
|
final_video.write_videofile(output_video_file, codec="libx264", audio_codec="aac", logger=None) |
|
|
|
print('final') |
|
return output_video_file |
|
|
|
def process_video(video, clip_type): |
|
|
|
mp3_file=extract_audio(video) |
|
wordlevel_info=transcribe(mp3_file) |
|
subtitles = create_subtitles(wordlevel_info) |
|
subtitle_file = generate_subtitle_file('fa', subtitles, 'video_subtitled') |
|
srt_string = read_srt_file(subtitle_file) |
|
google_translate = enhance_text(api_key, srt_string) |
|
srt = write_google(google_translate) |
|
|
|
sub = generate_translated_subtitle("fa", google_translate, "video_subtitled") |
|
output_video_file = video_edit(sub, video, input_audio= "audio.mp3") |
|
|
|
return output_video_file |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("Start typing below and then click **Run** to see the output.") |
|
with gr.Column(): |
|
video_file_input = gr.Video(label="Upload Video File") |
|
clip_type = gr.Dropdown(["auto edit", "default"], label="Clip Type") |
|
btn = gr.Button("create") |
|
video_file_output = gr.Video(label="result: ") |
|
btn.click(fn=process_video, inputs=[video_file_input, clip_type], outputs=video_file_output) |
|
""" with gr.Row(): |
|
vid_out = gr.Video() |
|
srt_file = gr.File() |
|
btn2 = gr.Button("transcribe") |
|
gr.on( |
|
triggers=[btn2.click], |
|
fn=write_google, |
|
inputs=out, |
|
).then(video_edit, [out, video_path_output, audio_path_output], outputs=[vid_out, srt_file])""" |
|
|
|
|
|
demo.launch(debug=True) |