from pytubefix import YouTube |
from pytubefix.cli import on_progress |
import time |
import math |
import gradio as gr |
import ffmpeg |
from faster_whisper import WhisperModel |
import requests |
import json |
import arabic_reshaper |
from bidi.algorithm import get_display |
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, ImageClip |
import pysrt |
import instaloader |
import time |
import concurrent.futures |
import re |
from io import BytesIO |
from PIL import Image |
api_key = "268976:66f4f58a2a905" |
def fetch_data(url): |
try: |
response = requests.get(url) |
response.raise_for_status() |
return response.json() |
except requests.exceptions.RequestException as e: |
print(f"An error occurred: {e}") |
return None |
def download_file(url): |
try: |
response = requests.get(url.split("#")[0], stream=True) |
response.raise_for_status() |
print(url.split("#")[1]) |
with open(url.split("#")[1], 'wb') as file: |
for chunk in response.iter_content(chunk_size=8192): |
if chunk: |
file.write(chunk) |
print(f"Downloaded successfully: {url.split('#')[1]}") |
except requests.exceptions.RequestException as e: |
print(f"An error occurred: {e}") |
def download_chunk(url, start, end, filename, index): |
headers = {'Range': f'bytes={start}-{end}'} |
response = requests.get(url, headers=headers, stream=True) |
response.raise_for_status() |
chunk_filename = f'{filename}.part{index}' |
with open(chunk_filename, 'wb') as file: |
for chunk in response.iter_content(chunk_size=8192): |
if chunk: |
file.write(chunk) |
return chunk_filename |
def merge_files(filename, num_parts): |
with open(filename, 'wb') as output_file: |
for i in range(num_parts): |
part_filename = f'{filename}.part{i}' |
with open(part_filename, 'rb') as part_file: |
output_file.write(part_file.read()) |
def download_file_in_parallel(link, size, num_threads=4): |
url = link.split("#")[0] |
filename = link.split("#")[1] |
print(url+" filename: "+filename) |
response = requests.head(url) |
chunk_size = size // num_threads |
ranges = [(i * chunk_size, (i + 1) * chunk_size - 1) for i in range(num_threads)] |
ranges[-1] = (ranges[-1][0], size - 1) |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: |
futures = [ |
executor.submit(download_chunk, url, start, end, filename, i) |
for i, (start, end) in enumerate(ranges) |
] |
for future in concurrent.futures.as_completed(futures): |
future.result() |
merge_files(filename, num_threads) |
print(f'Downloaded successfully: {filename}') |
def one_youtube(link, api_key): |
video_id_url = f"https://one-api.ir/youtube/?token={api_key}&action=getvideoid&link={link}" |
video_data = fetch_data(video_id_url) |
if not video_data: |
return None, None |
video_id = video_data["result"] |
filter_option = "" |
video_data_url = f"https://youtube.one-api.ir/?token={api_key}&action=fullvideo&id={video_id}&filter={filter_option}" |
video_data_2 = fetch_data(video_data_url) |
if not video_data_2: |
return None, None |
formats_list = video_data_2["result"]["formats"] |
file_name = video_data_2["result"]["title"] |
video_name = f'{file_name}.mp4' |
audio_name = f'{file_name}.mp3' |
for f in formats_list: |
if f["format_note"] == "360p": |
download_id = f["id"] |
video_size = f["filesize"] |
for f in formats_list: |
if f["format_note"] == "medium": |
audio_id = f["id"] |
audio_size = f["filesize"] |
if not download_id or not audio_id: |
return None, None |
video_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={download_id}" |
audio_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={audio_id}" |
video_link_data = fetch_data(video_link_url) |
audio_link_data = fetch_data(audio_link_url) |
if not video_link_data or not audio_link_data: |
return None, None |
video_link = video_link_data["result"]["link"] |
audio_link = audio_link_data["result"]["link"] |
vid_str=video_link+"#"+video_name |
audio_str=audio_link+"#"+audio_name |
print(video_size , audio_size) |
download_file_in_parallel(vid_str, video_size) |
download_file_in_parallel(audio_str, audio_size) |
return video_name, audio_name |
def yt_download(url): |
yt = YouTube(url) |
print(yt.title) |
video_path = f"{yt.title}.mp4" |
ys = yt.streams.get_highest_resolution() |
print(ys) |
ys.download() |
return video_path, yt.title |
def download_image(url, save_path='downloaded_image.jpg'): |
response = requests.get(url) |
image = Image.open(BytesIO(response.content)) |
image.save(save_path) |
return save_path |
def insta_oneapi(url, api_key): |
shortcode = url.split("/")[-2] |
print(shortcode) |
url_one="https://api.one-api.ir/instagram/v1/post/?shortcode="+shortcode |
request_body = [{"shortcode": shortcode},] |
headers = {"one-api-token": api_key, "Content-Type": "application/json"} |
response = requests.get(url_one, headers=headers) |
print(response) |
if response.status_code == 200: |
result = response.json() |
try: |
time.sleep(10) |
response = requests.get(result["result"]['media'][0]["url"], stream=True) |
response.raise_for_status() |
with open("video.mp4", 'wb') as file: |
for chunk in response.iter_content(chunk_size=8192): |
if chunk: |
file.write(chunk) |
print(f"Downloaded successfully") |
image_url = result["result"]['media'][0]["cover"] |
image_file_path = download_image(image_url) |
return "video.mp4", image_file_path |
except requests.exceptions.RequestException as e: |
print(f"An error occurred: {e}") |
else: |
print(f"Error: {response.status_code}, {response.text}") |
return None |
def insta_download(permalink): |
L = instaloader.Instaloader() |
try: |
if "instagram.com/reel/" in permalink: |
shortcode = permalink.split("instagram.com/reel/")[-1].split("/")[0] |
elif "instagram.com/p/" in permalink: |
shortcode = permalink.split("instagram.com/p/")[-1].split("/")[0] |
else: |
raise ValueError("Invalid permalink format") |
post = instaloader.Post.from_shortcode(L.context, shortcode) |
if not post.is_video: |
raise ValueError("The provided permalink is not a video.") |
video_url = post.video_url |
filename = video_url.split("/")[-1] |
filename = filename.split("?")[0] |
response = requests.get(video_url, stream=True) |
response.raise_for_status() |
with open(filename, 'wb') as file: |
for chunk in response.iter_content(chunk_size=8192): |
file.write(chunk) |
print(f"Downloaded video {filename} successfully.") |
return filename |
except Exception as e: |
print(f"Failed to download video from {permalink}: {e}") |
def extract_audio(input_video_name): |
mp3_file = "audio.mp3" |
video_clip = VideoFileClip(input_video_name) |
audio_clip = video_clip.audio |
audio_clip.write_audiofile(mp3_file) |
audio_clip.close() |
video_clip.close() |
print("Audio extraction successful!") |
return mp3_file |
def transcribe(audio): |
model = WhisperModel("tiny") |
segments, info = model.transcribe(audio) |
segments = list(segments) |
for segment in segments: |
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) |
return segments |
def format_time(seconds): |
hours = math.floor(seconds / 3600) |
seconds %= 3600 |
minutes = math.floor(seconds / 60) |
seconds %= 60 |
milliseconds = round((seconds - math.floor(seconds)) * 1000) |
seconds = math.floor(seconds) |
formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}" |
return formatted_time |
def generate_subtitle_file(language, segments, input_video_name): |
subtitle_file = f"sub-{input_video_name}.{language}.srt" |
text = "" |
for index, segment in enumerate(segments): |
segment_start = format_time(segment.start) |
segment_end = format_time(segment.end) |
text += f"{str(index+1)} \n" |
text += f"{segment_start} --> {segment_end} \n" |
text += f"{segment.text} \n" |
text += "\n" |
f = open(subtitle_file, "w", encoding='utf8') |
f.write(text) |
f.close() |
return subtitle_file |
def read_srt_file(file_path): |
try: |
with open(file_path, 'r', encoding='utf-8') as file: |
srt_content = file.read() |
return srt_content |
except FileNotFoundError: |
print(f"The file {file_path} was not found.") |
except Exception as e: |
print(f"An error occurred: {e}") |
def clean_text(text): |
text = re.sub(r"^```|```$", '', text) |
text = re.sub(r'^srt', '', text, flags=re.MULTILINE) |
return text |
def enhance_text(api_key, text, google): |
url = "https://api.one-api.ir/chatbot/v1/gpt4o/" |
request_body = [{ |
"role": "user", |
"content": f"{text} Please ignore all previous instructions. Please respond only in the Persian language.Do not explain what you are doing. Do not self reference.you arte an expert translator and your task here is to translte from english to persian The subject of the text is philosophy and life advice quote. The translation should have a poetic and literary tone in persian , Please follow these guidelines: Preserve the exact meaning of each sentence while adding a touch of Persian literary elegance. Keep the language clear and accessible, avoiding over-complicated or embellished phrases that alter the message. Ensure proper Persian grammar and structure for a polished, fluent result. Maintain alignment with the time ranges provided for subtitlesconvert the English terms used in it into common Persian terms which make the text undrastandabel and touching for persian audience .When in doubt, prioritize clarity and fidelity to the persian language structure over poetic embellis Translate it with the above considerations in summary you have to translate the text in a way beside is faitful loyality to the original text meaning it make it undrestandebel and touching for persian language auidence " |
},] |
headers = { |
"one-api-token": api_key, |
"Content-Type": "application/json" |
} |
response = requests.post(url, headers=headers, json=request_body) |
if response.status_code == 200: |
result = response.json() |
clean_text(result["result"][0]) |
last = clean_text(result["result"][0]) |
print("result: ") |
print(last) |
return last |
else: |
print(f"Error: {response.status_code}, {response.text}") |
return None |
def translate_text(api_key, source_lang, target_lang, text): |
url = "https://api.one-api.ir/translate/v1/google/" |
request_body = {"source": source_lang, "target": target_lang, "text": text} |
headers = {"one-api-token": api_key, "Content-Type": "application/json"} |
response = requests.post(url, headers=headers, json=request_body) |
if response.status_code == 200: |
result = response.json() |
enhanced_text = enhance_text(api_key, text, result['result']) |
return enhanced_text |
else: |
print(f"Error: {response.status_code}, {response.text}") |
return None |
def write_google(google_translate): |
google = "google_translate.srt" |
with open(google, 'w', encoding="utf-8") as f: |
f.write(google_translate) |
def time_to_seconds(time_obj): |
return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000 |
def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug): |
subtitle_clips = [] |
for subtitle in subtitles: |
start_time = time_to_seconds(subtitle.start) |
end_time = time_to_seconds(subtitle.end) |
duration = end_time - start_time |
video_width, video_height = videosize |
max_width = video_width * 0.8 |
max_height = video_height * 0.2 |
text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="center" ,color=color, method='caption').with_start(start_time).with_duration(duration) |
subtitle_x_position = 'center' |
subtitle_y_position = video_height * 0.68 |
text_position = (subtitle_x_position, subtitle_y_position) |
subtitle_clips.append(text_clip.with_position(text_position)) |
return subtitle_clips |
def process_video(url, type): |
if type=="insta": |
input_video, image_path=insta_oneapi(url, api_key) |
input_video_name = input_video.replace(".mp4", "") |
video = VideoFileClip(input_video) |
image_clip = ImageClip(image_path).with_duration(1) |
image_clip = image_clip.with_position(("center", "center")).resized(height=video.size[1]) |
first_video = CompositeVideoClip([video.with_start(1), image_clip]) |
input_video = input_video_name+"_cover.mp4" |
input_video_name = input_video.replace(".mp4", "") |
first_video.write_videofile(input_video, codec="libx264", audio_codec="aac", logger=None) |
input_audio = extract_audio(input_video) |
elif type=="youtube": |
input_video, input_audio = one_youtube(url, api_key) |
input_video_name = input_video.replace(".mp4", "") |
t = time.localtime() |
current_time = time.strftime("%H:%M:%S", t) |
print("Current Time =", current_time) |
segments = transcribe(audio=input_audio) |
language = "fa" |
subtitle_file = generate_subtitle_file(language=language, segments=segments, input_video_name=input_video_name) |
source_language = "en" |
target_language = "fa" |
srt_string = read_srt_file(subtitle_file) |
google_translate = translate_text(api_key, source_language, target_language, srt_string) |
write_google(google_translate) |
video = VideoFileClip(input_video) |
audio = AudioFileClip(input_audio) |
video = video.with_audio(audio) |
print(video) |
subtitles = pysrt.open("google_translate.srt", encoding="utf-8") |
output_video_file = input_video_name + '_subtitled' + ".mp4" |
subtitle_clips = create_subtitle_clips(subtitles, video.size, 32, 'arial.ttf', 'white', False) |
final_video = CompositeVideoClip([video] + subtitle_clips) |
final_video.write_videofile(output_video_file, codec="libx264", audio_codec="aac", logger=None) |
print('final') |
t = time.localtime() |
current_time = time.strftime("%H:%M:%S", t) |
print("Current Time =", current_time) |
return output_video_file |
def download_file(file_path): |
return gr.File.update(file_path) |
iface = gr.Interface(fn=process_video, inputs=["text" ,gr.Dropdown(["insta", "youtube"])], outputs="file") |
iface.launch(debug=True) |