|
from pytubefix import YouTube |
|
from pytubefix.cli import on_progress |
|
import time |
|
import math |
|
import gradio as gr |
|
import ffmpeg |
|
from faster_whisper import WhisperModel |
|
import requests |
|
import json |
|
import arabic_reshaper |
|
from bidi.algorithm import get_display |
|
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, ImageClip |
|
import pysrt |
|
import instaloader |
|
import time |
|
import concurrent.futures |
|
import re |
|
from io import BytesIO |
|
from PIL import Image |
|
api_key = "268976:66f4f58a2a905" |
|
|
|
|
|
def extract_audio(input_video_name): |
|
|
|
mp3_file = "audio.mp3" |
|
|
|
|
|
video_clip = VideoFileClip(input_video_name) |
|
|
|
|
|
audio_clip = video_clip.audio |
|
|
|
|
|
audio_clip.write_audiofile(mp3_file) |
|
|
|
|
|
audio_clip.close() |
|
video_clip.close() |
|
|
|
print("Audio extraction successful!") |
|
return mp3_file |
|
|
|
def transcribe(audio, max_segment_duration=2.0): |
|
model = WhisperModel("tiny", device="cpu") |
|
segments, info = model.transcribe(audio, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1500), word_timestamps=True) |
|
segments = list(segments) |
|
wordlevel_info = [] |
|
for segment in segments: |
|
for word in segment.words: |
|
print("[%.2fs -> %.2fs] %s" % (word.start, word.end, word.word)) |
|
wordlevel_info.append({'word':word.word,'start':word.start,'end':word.end}) |
|
return wordlevel_info |
|
|
|
def create_subtitles(wordlevel_info): |
|
punctuation_marks = {'.', '!', '?', ',', ';', ':', '—', '-', '。', '!', '?'} |
|
subtitles = [] |
|
line = [] |
|
|
|
for word_data in wordlevel_info: |
|
line.append(word_data) |
|
current_word = word_data['word'] |
|
|
|
|
|
ends_with_punct = current_word and (current_word[-1] in punctuation_marks) |
|
|
|
if ends_with_punct or len(line) == 5: |
|
|
|
subtitle = { |
|
"word": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"textcontents": line.copy() |
|
} |
|
subtitles.append(subtitle) |
|
line = [] |
|
|
|
|
|
if line: |
|
subtitle = { |
|
"word": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"textcontents": line.copy() |
|
} |
|
subtitles.append(subtitle) |
|
|
|
|
|
for i in range(1, len(subtitles)): |
|
prev_subtitle = subtitles[i - 1] |
|
current_subtitle = subtitles[i] |
|
|
|
|
|
prev_subtitle["end"] = current_subtitle["start"] |
|
|
|
return subtitles |
|
|
|
def format_time(seconds): |
|
hours = math.floor(seconds / 3600) |
|
seconds %= 3600 |
|
minutes = math.floor(seconds / 60) |
|
seconds %= 60 |
|
milliseconds = round((seconds - math.floor(seconds)) * 1000) |
|
seconds = math.floor(seconds) |
|
formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}" |
|
return formatted_time |
|
|
|
def generate_subtitle_file(language, segments, input_video_name): |
|
subtitle_file = f"sub-{input_video_name}.{language}.srt" |
|
text = "" |
|
for index, segment in enumerate(segments): |
|
segment_start = format_time(segment['start']) |
|
segment_end = format_time(segment['end']) |
|
text += f"{str(index+1)} \n" |
|
text += f"{segment_start} --> {segment_end} \n" |
|
text += f"{segment['word']} \n" |
|
text += "\n" |
|
f = open(subtitle_file, "w", encoding='utf8') |
|
f.write(text) |
|
f.close() |
|
return subtitle_file |
|
|
|
def clean_text(text): |
|
|
|
|
|
text = re.sub(r"^```|```$", '', text) |
|
text = re.sub(r'^srt', '', text, flags=re.MULTILINE) |
|
return text |
|
|
|
def translate_text(api_key, text, source_language = "en", target_language = "fa"): |
|
url = "https://api.one-api.ir/translate/v1/google/" |
|
request_body = {"source": source_lang, "target": target_lang, "text": text} |
|
headers = {"one-api-token": api_key, "Content-Type": "application/json"} |
|
response = requests.post(url, headers=headers, json=request_body) |
|
if response.status_code == 200: |
|
result = response.json() |
|
return result['result'] |
|
else: |
|
print(f"Error: {response.status_code}, {response.text}") |
|
return None |
|
|
|
def enhance_text(api_key, text): |
|
url = "https://api.one-api.ir/chatbot/v1/gpt4o/" |
|
|
|
|
|
request_body = [{ |
|
"role": "user", |
|
"content": f"i will provide you with an english subtitle of a clip which is in srt format and i need you to translate each line in persian an return in a srt format without changing the original timing, converting the English terms used, into common Persian terms. in respose dont add any thing and keep the srt format, keep in mind the duraetion of the end of the srt should be the same as the duaration of the clip. subtitle: {text} " |
|
},] |
|
|
|
|
|
headers = { |
|
"one-api-token": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
attempts = 0 |
|
max_attempts = 3 |
|
|
|
while attempts < max_attempts: |
|
response = requests.post(url, headers=headers, json=request_body) |
|
if response.status_code == 200: |
|
result = response.json() |
|
if result["status"] == 200: |
|
print("status: ", result["status"]) |
|
te = clean_text(result["result"][0]) |
|
print("result: ", te) |
|
return te |
|
else: |
|
print(f"Error: status {result['status']}, retrying in 30 seconds...") |
|
else: |
|
print(f"Error: {response.status_code}, {response.text}, retrying in 30 seconds...") |
|
attempts += 1 |
|
time.sleep(30) |
|
print("Error Max attempts reached. Could not retrieve a successful response.") |
|
te = translate_text(api_key, text) |
|
return te |
|
|
|
def read_srt_file(file_path): |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
srt_content = file.read() |
|
return srt_content |
|
except FileNotFoundError: |
|
print(f"The file {file_path} was not found.") |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
|
|
def write_srt(subtitle_text, output_file="edited_srt.srt"): |
|
with open(output_file, 'w', encoding="utf-8") as file: |
|
file.write(subtitle_text) |
|
|
|
def write_google(google_translate): |
|
google = "google_translate.srt" |
|
with open(google, 'w', encoding="utf-8") as f: |
|
f.write(google_translate) |
|
return google |
|
|
|
def generate_translated_subtitle(language, segments, input_video_name): |
|
input_video_name=input_video_name.split('/')[-1] |
|
subtitle_file = f"{input_video_name}.srt" |
|
text = "" |
|
lines = segments.split('\n') |
|
new_list = [item for item in lines if item != ''] |
|
segment_number = 1 |
|
|
|
for index, segment in enumerate(new_list): |
|
if (index+1) % 3 == 1 or (index+1)==1: |
|
text += f"{segment}\n" |
|
segment_number += 1 |
|
if (index+1) % 3 == 2 or (index+1)==2: |
|
text += segment + "\n" |
|
if (index+1) % 3 == 0: |
|
text += f"\u200F{segment}\n\n" |
|
|
|
with open(subtitle_file, "w", encoding='utf8') as f: |
|
f.write(text) |
|
return subtitle_file |
|
|
|
def time_to_seconds(time_obj): |
|
return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000 |
|
|
|
def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug): |
|
subtitle_clips = [] |
|
color_clips=[] |
|
for subtitle in subtitles: |
|
start_time = time_to_seconds(subtitle.start) |
|
end_time = time_to_seconds(subtitle.end) |
|
duration = end_time - start_time |
|
video_width, video_height = videosize |
|
max_width = video_width * 0.8 |
|
max_height = video_height * 0.2 |
|
|
|
|
|
text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="right" ,color=color, method='caption').with_start(start_time).with_duration(duration) |
|
myclip = ColorClip(size=(int(video_width * 0.8), int(video_height * 0.2)) , color=(225, 0, 0)).with_opacity(0.2).with_start(start_time).with_duration(duration) |
|
subtitle_x_position = 'center' |
|
subtitle_y_position = video_height * 0.68 |
|
text_position = (subtitle_x_position, subtitle_y_position) |
|
subtitle_clips.append(text_clip.with_position(text_position)) |
|
color_clips.append(myclip.with_position(text_position)) |
|
return subtitle_clips, color_clips |
|
|
|
def video_edit(srt, input_video, input_audio= "audio.mp3"): |
|
input_video_name = "video" |
|
video = VideoFileClip(input_video) |
|
audio = AudioFileClip(input_audio) |
|
video = video.with_audio(audio) |
|
print(video) |
|
output_video_file = input_video_name + '_subtitled' + ".mp4" |
|
|
|
subtitles = pysrt.open("video_subtitled.srt", encoding="utf-8") |
|
subtitle_clips, color_clips = create_subtitle_clips(subtitles, video.size, 24, 'arial-unicode-ms.ttf', 'white', False) |
|
final_video = CompositeVideoClip([video]+color_clips + subtitle_clips) |
|
final_video.write_videofile(output_video_file, codec="libx264", audio_codec="aac", logger=None) |
|
|
|
print('final') |
|
return output_video_file |
|
|
|
def process_video(video, clip_type): |
|
|
|
mp3_file=extract_audio(video) |
|
wordlevel_info=transcribe(mp3_file) |
|
subtitles = create_subtitles(wordlevel_info) |
|
subtitle_file = generate_subtitle_file('fa', subtitles, 'video_subtitled') |
|
srt_string = read_srt_file(subtitle_file) |
|
google_translate = enhance_text(api_key, srt_string) |
|
srt = write_google(google_translate) |
|
|
|
sub = generate_translated_subtitle("fa", google_translate, "video_subtitled") |
|
output_video_file = video_edit(sub, video, input_audio= "audio.mp3") |
|
|
|
return output_video_file |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("Start typing below and then click **Run** to see the output.") |
|
with gr.Column(): |
|
video_file_input = gr.Video(label="Upload Video File") |
|
clip_type = gr.Dropdown(["auto edit", "default"], label="Clip Type") |
|
btn = gr.Button("create") |
|
video_file_output = gr.Video(label="result: ") |
|
btn.click(fn=process_video, inputs=[video_file_input, clip_type], outputs=video_file_output) |
|
""" with gr.Row(): |
|
vid_out = gr.Video() |
|
srt_file = gr.File() |
|
btn2 = gr.Button("transcribe") |
|
gr.on( |
|
triggers=[btn2.click], |
|
fn=write_google, |
|
inputs=out, |
|
).then(video_edit, [out, video_path_output, audio_path_output], outputs=[vid_out, srt_file])""" |
|
|
|
|
|
demo.launch(debug=True) |