bale_clean / dub.py
SPACERUNNER99's picture
Update dub.py
e3b75aa verified
raw
history blame
6.36 kB
import os
import requests
import subprocess
from pydub import AudioSegment
from moviepy import VideoFileClip, AudioFileClip, CompositeAudioClip
from pydub import effects
import os
import pysrt
import json
import time
from moviepy import VideoFileClip, AudioFileClip, AudioClip
import os
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzeXN0ZW0iOiJzYWhhYiIsImNyZWF0ZVRpbWUiOiIxNDAzMTIwNTE0MTgzMDM2MyIsInVuaXF1ZUZpZWxkcyI6eyJ1c2VybmFtZSI6IjFlZDZjN2M1LWVjNTktNGI4Yi1iYThkLTk1NTk1ZWQ0MmNhMCJ9LCJkYXRhIjp7InNlcnZpY2VJRCI6ImRmNTNhNzgwLTIxNTgtNDUyNC05MjQ3LWM2ZjBiYWQzZTc3MCIsInJhbmRvbVRleHQiOiJvYlNXciJ9LCJncm91cE5hbWUiOiIwMmYzMWRmM2IyMjczMmJkMDNmYjBlYjU2ZjE1MGEzZCJ9.QakcV3rPn7bji7ur0VPmCzHLWiOs2NXEGw9ILyhpgOw"
def generate_tts_audio(persian_text, output_file):
api_url = "https://partai.gw.isahab.ir/TextToSpeech/v1/speech-synthesys"
proxies = {
"https" : "https://free.shecan.ir/dns-query"
}
headers = {
'Content-Type': 'application/json',
'gateway-token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzeXN0ZW0iOiJzYWhhYiIsImNyZWF0ZVRpbWUiOiIxNDAzMTIwNTE0MTgzMDM2MyIsInVuaXF1ZUZpZWxkcyI6eyJ1c2VybmFtZSI6IjFlZDZjN2M1LWVjNTktNGI4Yi1iYThkLTk1NTk1ZWQ0MmNhMCJ9LCJkYXRhIjp7InNlcnZpY2VJRCI6ImRmNTNhNzgwLTIxNTgtNDUyNC05MjQ3LWM2ZjBiYWQzZTc3MCIsInJhbmRvbVRleHQiOiJvYlNXciJ9LCJncm91cE5hbWUiOiIwMmYzMWRmM2IyMjczMmJkMDNmYjBlYjU2ZjE1MGEzZCJ9.QakcV3rPn7bji7ur0VPmCzHLWiOs2NXEGw9ILyhpgOw'
}
payload = json.dumps({
"data": persian_text,
"filePath": "true",
"base64": "0",
"checksum": "1",
"speaker": "2"
})
response = requests.request("POST", api_url, headers=headers, data=payload, proxies=proxies)
link = response.text.split('"')[11]
link = "https://"+link
print(link)
responseD = requests.get(link, stream=True)
responseD.raise_for_status()
if responseD:
with open(output_file, 'wb') as file:
for chunk in responseD.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
file.close()
print(f"Downloaded successfully")
time.sleep(10)
return "video.mp4"
else:
print(f"Failed to generate TTS audio: {response.status_code} - {response.text}")
return False
def generate_audio_segments(segments, output_dir):
audio_files = []
for index, segment in enumerate(segments):
audio_file = os.path.join(output_dir, f"segment_{index}.mp3")
max_retries = 3
retries = 0
while retries < max_retries:
try:
if generate_tts_audio(segment.text, audio_file): # Assuming this returns True/False based on success
audio_files.append(((f"{segment.start} --> {segment.end}"), audio_file))
break # If successful, move to the next segment
# If the above fails (returns False or raises an exception), wait and retry
retries += 1
if retries < max_retries:
time.sleep(30) # Wait for 30 seconds before retrying
except Exception as e:
if retries == max_retries - 1: # Last retry attempt
raise RuntimeError(f"Failed to generate audio after {max_retries} attempts for segment: {segment.text}") from e
else:
# If all retries failed (loop completed without breaking)
raise RuntimeError(f"Failed to generate audio after {max_retries} attempts for segment: {segment.text}")
return audio_files
def srt_time_to_seconds(srt_time):
hours, minutes, seconds = srt_time.split(':')
seconds, milliseconds = seconds.split(',')
total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000
return total_seconds
def render_dubbed_video(input_video, audio_files, output_video):
# Load the input video and remove its audio
video = VideoFileClip(input_video)
video_no_audio = video.without_audio()
print("Video duration (ms):")
print(video_no_audio.duration * 1000)
# Get total video duration in milliseconds
video_duration_in_seconds = video_no_audio.duration
video_duration_in_ms = video_duration_in_seconds * 1000
audio_canva = AudioSegment.silent(duration=video_duration_in_ms)
for timestamp, audio_file in audio_files:
start_str, end_str = timestamp.split(' --> ')
start_sec = srt_time_to_seconds(start_str) * 1000
end_sec = srt_time_to_seconds(end_str) * 1000
# Load the audio file
audio = AudioSegment.from_file(audio_file)
original_duration_ms = len(audio)
available_slot = end_sec - start_sec
if available_slot <= 0:
print(f"Invalid timestamp for {audio_file}. Skipping.")
continue
elif original_duration_ms > available_slot:
speed_factor = min(original_duration_ms / available_slot, 1.2)
audio = audio.speedup(speed_factor)
# Append the processed audio to the canvas
audio_canva = audio_canva.overlay(audio, position=start_sec)
# Export the combined audio to a temporary file
combined_audio_file = "combined_audio.mp3"
audio_canva.export(combined_audio_file, format="mp3")
# Load the combined audio using MoviePy
new_audio = AudioFileClip(combined_audio_file)
# Set the new audio to the video
final_video = video_no_audio.with_audio(new_audio)
# Write the output video file
final_video.write_videofile(output_video, codec="libx264", audio_codec="aac")
# Clean up temporary files
video.close()
new_audio.close()
final_video.close()
def dub(srt, input_video):
# Step 1: Parse the SRT-like text
output_video = "video_out.mp4"
subtitles = pysrt.open(srt, encoding="utf-8")
print("Parsed segments:", subtitles)
# Step 2: Translation (commented out as input is already Persian)
# Step 3: Generate audio for each Persian segment
output_dir = "audio_segments"
os.makedirs(output_dir, exist_ok=True)
audio_files = generate_audio_segments(subtitles, output_dir)
# Step 4: Render the dubbed video
render_dubbed_video(input_video, audio_files, output_video)
# Clean up audio segments directory
for _, audio_file in audio_files:
os.remove(audio_file)
os.rmdir(output_dir)
return output_video