from pytubefix import YouTube from pytubefix.cli import on_progress import time import math import gradio as gr import ffmpeg from faster_whisper import WhisperModel import requests import json import arabic_reshaper # pip install arabic-reshaper from bidi.algorithm import get_display # pip install python-bidi from moviepy import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, ImageClip import pysrt import instaloader import time import concurrent.futures import re from io import BytesIO from PIL import Image api_key = "268976:66f4f58a2a905" def fetch_data(url): try: response = requests.get(url) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") return None def download_file(url): try: response = requests.get(url.split("#")[0], stream=True) response.raise_for_status() print(url.split("#")[1]) with open(url.split("#")[1], 'wb') as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"Downloaded successfully: {url.split('#')[1]}") except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") def download_chunk(url, start, end, filename, index): headers = {'Range': f'bytes={start}-{end}'} response = requests.get(url, headers=headers, stream=True) response.raise_for_status() chunk_filename = f'{filename}.part{index}' with open(chunk_filename, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) return chunk_filename def merge_files(filename, num_parts): with open(filename, 'wb') as output_file: for i in range(num_parts): part_filename = f'{filename}.part{i}' with open(part_filename, 'rb') as part_file: output_file.write(part_file.read()) # Optionally, delete the part file after merging # os.remove(part_filename) def download_file_in_parallel(link, size, num_threads=4): url = link.split("#")[0] filename = link.split("#")[1] print(url+" filename: "+filename) response = requests.head(url) #file_size = int(response.headers['Content-Length']) chunk_size = size // num_threads ranges = [(i * chunk_size, (i + 1) * chunk_size - 1) for i in range(num_threads)] ranges[-1] = (ranges[-1][0], size - 1) # Adjust the last range to the end of the file with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [ executor.submit(download_chunk, url, start, end, filename, i) for i, (start, end) in enumerate(ranges) ] for future in concurrent.futures.as_completed(futures): future.result() # Ensure all threads complete merge_files(filename, num_threads) print(f'Downloaded successfully: {filename}') def one_youtube(link, api_key): # Fetch video ID video_id_url = f"https://one-api.ir/youtube/?token={api_key}&action=getvideoid&link={link}" video_data = fetch_data(video_id_url) if not video_data: return None, None video_id = video_data["result"] # Fetch video data filter_option = "" # Replace with your filter option video_data_url = f"https://youtube.one-api.ir/?token={api_key}&action=fullvideo&id={video_id}&filter={filter_option}" video_data_2 = fetch_data(video_data_url) if not video_data_2: return None, None formats_list = video_data_2["result"]["formats"] file_name = video_data_2["result"]["title"] video_name = f'{file_name}.mp4' audio_name = f'{file_name}.mp3' for f in formats_list: if f["format_note"] == "360p": download_id = f["id"] video_size = f["filesize"] for f in formats_list: if f["format_note"] == "medium": audio_id = f["id"] audio_size = f["filesize"] if not download_id or not audio_id: return None, None # Fetch video and audio links video_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={download_id}" audio_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={audio_id}" video_link_data = fetch_data(video_link_url) audio_link_data = fetch_data(audio_link_url) if not video_link_data or not audio_link_data: return None, None video_link = video_link_data["result"]["link"] audio_link = audio_link_data["result"]["link"] vid_str=video_link+"#"+video_name audio_str=audio_link+"#"+audio_name # Download video and audio files print(video_size , audio_size) download_file_in_parallel(vid_str, video_size) download_file_in_parallel(audio_str, audio_size) return video_name, audio_name # Define your functions here def yt_download(url): yt = YouTube(url) print(yt.title) video_path = f"{yt.title}.mp4" ys = yt.streams.get_highest_resolution() print(ys) ys.download() return video_path, yt.title def download_image(url, save_path='downloaded_image.jpg'): response = requests.get(url) image = Image.open(BytesIO(response.content)) image.save(save_path) return save_path def insta_oneapi(url, api_key): shortcode = url.split("/")[-2] print(shortcode) url_one="https://api.one-api.ir/instagram/v1/post/?shortcode="+shortcode request_body = [{"shortcode": shortcode},] headers = {"one-api-token": api_key, "Content-Type": "application/json"} response = requests.get(url_one, headers=headers) print(response) if response.status_code == 200: result = response.json() try: time.sleep(10) response = requests.get(result["result"]['media'][0]["url"], stream=True) response.raise_for_status() with open("video.mp4", 'wb') as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"Downloaded successfully") image_url = result["result"]['media'][0]["cover"] image_file_path = download_image(image_url) return "video.mp4", image_file_path except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") else: print(f"Error: {response.status_code}, {response.text}") return None def insta_download(permalink): # Create an instance of Instaloader L = instaloader.Instaloader() try: # Extract the shortcode from the permalink if "instagram.com/reel/" in permalink: shortcode = permalink.split("instagram.com/reel/")[-1].split("/")[0] elif "instagram.com/p/" in permalink: shortcode = permalink.split("instagram.com/p/")[-1].split("/")[0] else: raise ValueError("Invalid permalink format") # Load the post using the shortcode post = instaloader.Post.from_shortcode(L.context, shortcode) # Check if the post is a video if not post.is_video: raise ValueError("The provided permalink is not a video.") # Get the video URL video_url = post.video_url # Extract the filename from the URL filename = video_url.split("/")[-1] # Remove query parameters filename = filename.split("?")[0] # Download the video using requests response = requests.get(video_url, stream=True) response.raise_for_status() # Raise an error for bad responses # Save the content to a file with open(filename, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) print(f"Downloaded video {filename} successfully.") return filename except Exception as e: print(f"Failed to download video from {permalink}: {e}") def extract_audio(input_video_name): # Define the input video file and output audio file mp3_file = "audio.mp3" # Load the video clip video_clip = VideoFileClip(input_video_name) # Extract the audio from the video clip audio_clip = video_clip.audio # Write the audio to a separate file audio_clip.write_audiofile(mp3_file) # Close the video and audio clips audio_clip.close() video_clip.close() print("Audio extraction successful!") return mp3_file def transcribe(audio): model = WhisperModel("tiny") segments, info = model.transcribe(audio) segments = list(segments) for segment in segments: print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) return segments def format_time(seconds): hours = math.floor(seconds / 3600) seconds %= 3600 minutes = math.floor(seconds / 60) seconds %= 60 milliseconds = round((seconds - math.floor(seconds)) * 1000) seconds = math.floor(seconds) formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}" return formatted_time def generate_subtitle_file(language, segments, input_video_name): subtitle_file = f"sub-{input_video_name}.{language}.srt" text = "" for index, segment in enumerate(segments): segment_start = format_time(segment.start) segment_end = format_time(segment.end) text += f"{str(index+1)} \n" text += f"{segment_start} --> {segment_end} \n" text += f"{segment.text} \n" text += "\n" f = open(subtitle_file, "w", encoding='utf8') f.write(text) f.close() return subtitle_file def read_srt_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: srt_content = file.read() return srt_content except FileNotFoundError: print(f"The file {file_path} was not found.") except Exception as e: print(f"An error occurred: {e}") def clean_text(text): # Remove 'srt ' from the start of each line # Remove ''' from the start and end text = re.sub(r"^```|```$", '', text) text = re.sub(r'^srt', '', text, flags=re.MULTILINE) return text def enhance_text(api_key, text, google): url = "https://api.one-api.ir/chatbot/v1/gpt4o/" # Prepare the request body request_body = [{ "role": "user", "content": f"{text} Please ignore all previous instructions. Please respond only in the Persian language.Do not explain what you are doing. Do not self reference.you arte an expert translator and your task here is to translte from english to persian The subject of the text is philosophy and life advice quote. The translation should have a poetic and literary tone in persian , Please follow these guidelines: Preserve the exact meaning of each sentence while adding a touch of Persian literary elegance. Keep the language clear and accessible, avoiding over-complicated or embellished phrases that alter the message. Ensure proper Persian grammar and structure for a polished, fluent result. Maintain alignment with the time ranges provided for subtitlesconvert the English terms used in it into common Persian terms which make the text undrastandabel and touching for persian audience .When in doubt, prioritize clarity and fidelity to the persian language structure over poetic embellis Translate it with the above considerations in summary you have to translate the text in a way beside is faitful loyality to the original text meaning it make it undrestandebel and touching for persian language auidence " },] # Add the API key to the request headers = { "one-api-token": api_key, "Content-Type": "application/json" } # Make the POST request response = requests.post(url, headers=headers, json=request_body) # Check the response status if response.status_code == 200: result = response.json() clean_text(result["result"][0]) last = clean_text(result["result"][0]) print("result: ") print(last) return last else: print(f"Error: {response.status_code}, {response.text}") return None def translate_text(api_key, source_lang, target_lang, text): url = "https://api.one-api.ir/translate/v1/google/" request_body = {"source": source_lang, "target": target_lang, "text": text} headers = {"one-api-token": api_key, "Content-Type": "application/json"} response = requests.post(url, headers=headers, json=request_body) if response.status_code == 200: result = response.json() enhanced_text = enhance_text(api_key, text, result['result']) return enhanced_text else: print(f"Error: {response.status_code}, {response.text}") return None def write_google(google_translate): google = "google_translate.srt" with open(google, 'w', encoding="utf-8") as f: f.write(google_translate) def time_to_seconds(time_obj): return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000 def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug): subtitle_clips = [] for subtitle in subtitles: start_time = time_to_seconds(subtitle.start) # Add 2 seconds offset end_time = time_to_seconds(subtitle.end) duration = end_time - start_time video_width, video_height = videosize max_width = video_width * 0.8 max_height = video_height * 0.2 #reshaped_text = arabic_reshaper.reshape(subtitle.text) #bidi_text = get_display(reshaped_text) text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="center" ,color=color, method='caption').with_start(start_time).with_duration(duration) subtitle_x_position = 'center' subtitle_y_position = video_height * 0.68 text_position = (subtitle_x_position, subtitle_y_position) subtitle_clips.append(text_clip.with_position(text_position)) return subtitle_clips def process_video(url, type): if type=="insta": input_video, image_path=insta_oneapi(url, api_key) input_video_name = input_video.replace(".mp4", "") video = VideoFileClip(input_video) image_clip = ImageClip(image_path).with_duration(1) # Set the position and size of the image (optional) image_clip = image_clip.with_position(("center", "center")).resized(height=video.size[1]) first_video = CompositeVideoClip([video.with_start(1), image_clip]) input_video = input_video_name+"_cover.mp4" input_video_name = input_video.replace(".mp4", "") first_video.write_videofile(input_video, codec="libx264", audio_codec="aac", logger=None) input_audio = extract_audio(input_video) elif type=="youtube": input_video, input_audio = one_youtube(url, api_key) input_video_name = input_video.replace(".mp4", "") # Get the current local time t = time.localtime() # Format the time as a string current_time = time.strftime("%H:%M:%S", t) print("Current Time =", current_time) segments = transcribe(audio=input_audio) language = "fa" subtitle_file = generate_subtitle_file(language=language, segments=segments, input_video_name=input_video_name) source_language = "en" target_language = "fa" srt_string = read_srt_file(subtitle_file) google_translate = translate_text(api_key, source_language, target_language, srt_string) write_google(google_translate) video = VideoFileClip(input_video) audio = AudioFileClip(input_audio) video = video.with_audio(audio) print(video) subtitles = pysrt.open("google_translate.srt", encoding="utf-8") output_video_file = input_video_name + '_subtitled' + ".mp4" subtitle_clips = create_subtitle_clips(subtitles, video.size, 32, 'arial.ttf', 'white', False) final_video = CompositeVideoClip([video] + subtitle_clips) final_video.write_videofile(output_video_file, codec="libx264", audio_codec="aac", logger=None) print('final') # Get the current local time t = time.localtime() # Format the time as a string current_time = time.strftime("%H:%M:%S", t) print("Current Time =", current_time) # Generate the URL for the file return output_video_file def download_file(file_path): return gr.File.update(file_path) iface = gr.Interface(fn=process_video, inputs=["text" ,gr.Dropdown(["insta", "youtube"])], outputs="file") iface.launch(debug=True)