import os import requests import subprocess from pydub import AudioSegment from moviepy import VideoFileClip, AudioFileClip, CompositeAudioClip from pydub import effects import os import pysrt import json import time from moviepy import VideoFileClip, AudioFileClip, AudioClip import os API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzeXN0ZW0iOiJzYWhhYiIsImNyZWF0ZVRpbWUiOiIxNDAzMTIwNTE0MTgzMDM2MyIsInVuaXF1ZUZpZWxkcyI6eyJ1c2VybmFtZSI6IjFlZDZjN2M1LWVjNTktNGI4Yi1iYThkLTk1NTk1ZWQ0MmNhMCJ9LCJkYXRhIjp7InNlcnZpY2VJRCI6ImRmNTNhNzgwLTIxNTgtNDUyNC05MjQ3LWM2ZjBiYWQzZTc3MCIsInJhbmRvbVRleHQiOiJvYlNXciJ9LCJncm91cE5hbWUiOiIwMmYzMWRmM2IyMjczMmJkMDNmYjBlYjU2ZjE1MGEzZCJ9.QakcV3rPn7bji7ur0VPmCzHLWiOs2NXEGw9ILyhpgOw" def generate_tts_audio(persian_text, output_file): api_url = "https://partai.gw.isahab.ir/TextToSpeech/v1/speech-synthesys" proxies = { "https" : "https://free.shecan.ir/dns-query" } headers = { 'Content-Type': 'application/json', 'gateway-token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzeXN0ZW0iOiJzYWhhYiIsImNyZWF0ZVRpbWUiOiIxNDAzMTIwNTE0MTgzMDM2MyIsInVuaXF1ZUZpZWxkcyI6eyJ1c2VybmFtZSI6IjFlZDZjN2M1LWVjNTktNGI4Yi1iYThkLTk1NTk1ZWQ0MmNhMCJ9LCJkYXRhIjp7InNlcnZpY2VJRCI6ImRmNTNhNzgwLTIxNTgtNDUyNC05MjQ3LWM2ZjBiYWQzZTc3MCIsInJhbmRvbVRleHQiOiJvYlNXciJ9LCJncm91cE5hbWUiOiIwMmYzMWRmM2IyMjczMmJkMDNmYjBlYjU2ZjE1MGEzZCJ9.QakcV3rPn7bji7ur0VPmCzHLWiOs2NXEGw9ILyhpgOw' } payload = json.dumps({ "data": persian_text, "filePath": "true", "base64": "0", "checksum": "1", "speaker": "2" }) response = requests.request("POST", api_url, headers=headers, data=payload, proxies=proxies) link = response.text.split('"')[11] link = "https://"+link print(link) responseD = requests.get(link, stream=True) responseD.raise_for_status() if responseD: with open(output_file, 'wb') as file: for chunk in responseD.iter_content(chunk_size=8192): if chunk: file.write(chunk) file.close() print(f"Downloaded successfully") time.sleep(10) return "video.mp4" else: print(f"Failed to generate TTS audio: {response.status_code} - {response.text}") return False def generate_audio_segments(segments, output_dir): audio_files = [] for index, segment in enumerate(segments): audio_file = os.path.join(output_dir, f"segment_{index}.mp3") max_retries = 3 retries = 0 while retries < max_retries: try: if generate_tts_audio(segment.text, audio_file): # Assuming this returns True/False based on success audio_files.append(((f"{segment.start} --> {segment.end}"), audio_file)) break # If successful, move to the next segment # If the above fails (returns False or raises an exception), wait and retry retries += 1 if retries < max_retries: time.sleep(30) # Wait for 30 seconds before retrying except Exception as e: if retries == max_retries - 1: # Last retry attempt raise RuntimeError(f"Failed to generate audio after {max_retries} attempts for segment: {segment.text}") from e else: # If all retries failed (loop completed without breaking) raise RuntimeError(f"Failed to generate audio after {max_retries} attempts for segment: {segment.text}") return audio_files def srt_time_to_seconds(srt_time): hours, minutes, seconds = srt_time.split(':') seconds, milliseconds = seconds.split(',') total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000 return total_seconds def render_dubbed_video(input_video, audio_files, output_video): # Load the input video and remove its audio video = VideoFileClip(input_video) video_no_audio = video.without_audio() print("Video duration (ms):") print(video_no_audio.duration * 1000) # Get total video duration in milliseconds video_duration_in_seconds = video_no_audio.duration video_duration_in_ms = video_duration_in_seconds * 1000 audio_canva = AudioSegment.silent(duration=video_duration_in_ms) for timestamp, audio_file in audio_files: start_str, end_str = timestamp.split(' --> ') start_sec = srt_time_to_seconds(start_str) * 1000 end_sec = srt_time_to_seconds(end_str) * 1000 # Load the audio file audio = AudioSegment.from_file(audio_file) original_duration_ms = len(audio) available_slot = end_sec - start_sec if available_slot <= 0: print(f"Invalid timestamp for {audio_file}. Skipping.") continue elif original_duration_ms > available_slot: speed_factor = min(original_duration_ms / available_slot, 1.2) audio = audio.speedup(speed_factor) # Append the processed audio to the canvas audio_canva = audio_canva.overlay(audio, position=start_sec) # Export the combined audio to a temporary file combined_audio_file = "combined_audio.mp3" audio_canva.export(combined_audio_file, format="mp3") # Load the combined audio using MoviePy new_audio = AudioFileClip(combined_audio_file) # Set the new audio to the video final_video = video_no_audio.with_audio(new_audio) # Write the output video file final_video.write_videofile(output_video, codec="libx264", audio_codec="aac") # Clean up temporary files video.close() new_audio.close() final_video.close() def dub(srt, input_video): # Step 1: Parse the SRT-like text output_video = "video_out.mp4" subtitles = pysrt.open(srt, encoding="utf-8") print("Parsed segments:", subtitles) # Step 2: Translation (commented out as input is already Persian) # Step 3: Generate audio for each Persian segment output_dir = "audio_segments" os.makedirs(output_dir, exist_ok=True) audio_files = generate_audio_segments(subtitles, output_dir) # Step 4: Render the dubbed video render_dubbed_video(input_video, audio_files, output_video) # Clean up audio segments directory for _, audio_file in audio_files: os.remove(audio_file) os.rmdir(output_dir) return output_video