Spaces:
Sleeping
Sleeping
| import openai | |
| from pytube import YouTube | |
| import argparse | |
| import os | |
| from tqdm import tqdm | |
| from SRT import SRT_script | |
| import stable_whisper | |
| import time | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--link", help="youtube video link here", default=None, type=str, required=False) | |
| parser.add_argument("--video_file", help="local video path here", default=None, type=str, required=False) | |
| parser.add_argument("--audio_file", help="local audio path here", default=None, type=str, required=False) | |
| parser.add_argument("--srt_file", help="srt file input path here", default=None, type=str, required=False) # New argument | |
| parser.add_argument("--download", help="download path", default='./downloads', type=str, required=False) | |
| parser.add_argument("--output_dir", help="translate result path", default='./results', type=str, required=False) | |
| parser.add_argument("--video_name", help="video name, if use video link as input, the name will auto-filled by youtube video name", default='placeholder', type=str, required=False) | |
| parser.add_argument("--model_name", help="model name only support gpt-4 and gpt-3.5-turbo", type=str, required=False, default="gpt-3.5-turbo") | |
| parser.add_argument("-only_srt", help="set script output to only .srt file", action='store_true') | |
| parser.add_argument("-v", help="auto encode script with video", action='store_true') | |
| args = parser.parse_args() | |
| # input should be either video file or youtube video link. | |
| if args.link is None and args.video_file is None and args.srt_file is None: | |
| print("need video source or srt file") | |
| exit() | |
| # set up | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| DOWNLOAD_PATH = args.download | |
| if not os.path.exists(DOWNLOAD_PATH): | |
| os.mkdir(DOWNLOAD_PATH) | |
| os.mkdir(f'{DOWNLOAD_PATH}/audio') | |
| os.mkdir(f'{DOWNLOAD_PATH}/video') | |
| RESULT_PATH = args.output_dir | |
| if not os.path.exists(RESULT_PATH): | |
| os.mkdir(RESULT_PATH) | |
| # set video name as the input file name if not specified | |
| if args.video_name == 'placeholder' : | |
| # set video name to upload file name | |
| if args.video_file is not None: | |
| VIDEO_NAME = args.video_file.split('/')[-1].split('.')[0] | |
| elif args.audio_file is not None: | |
| VIDEO_NAME = args.audio_file.split('/')[-1].split('.')[0] | |
| elif args.srt_file is not None: | |
| VIDEO_NAME = args.srt_file.split('/')[-1].split('.')[0] | |
| else: | |
| VIDEO_NAME = args.video_name | |
| model_name = args.model_name | |
| # get source audio | |
| if args.link is not None and args.video_file is None: | |
| # Download audio from YouTube | |
| video_link = args.link | |
| video = None | |
| audio = None | |
| try: | |
| yt = YouTube(video_link) | |
| video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| if video: | |
| video.download(f'{DOWNLOAD_PATH}/video') | |
| print('Video download completed!') | |
| else: | |
| print("Error: Video stream not found") | |
| audio = yt.streams.filter(only_audio=True, file_extension='mp4').first() | |
| if audio: | |
| audio.download(f'{DOWNLOAD_PATH}/audio') | |
| print('Audio download completed!') | |
| else: | |
| print("Error: Audio stream not found") | |
| except Exception as e: | |
| print("Connection Error") | |
| print(e) | |
| exit() | |
| video_path = f'{DOWNLOAD_PATH}/video/{video.default_filename}' | |
| audio_path = '{}/audio/{}'.format(DOWNLOAD_PATH, audio.default_filename) | |
| audio_file = open(audio_path, "rb") | |
| if VIDEO_NAME == 'placeholder': | |
| VIDEO_NAME = audio.default_filename.split('.')[0] | |
| elif args.video_file is not None: | |
| # Read from local | |
| video_path = args.video_file | |
| if args.audio_file is not None: | |
| audio_file= open(args.audio_file, "rb") | |
| audio_path = args.audio_file | |
| else: | |
| os.system(f'ffmpeg -i {args.video_file} -f mp3 -ab 192000 -vn {DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3') | |
| audio_file= open(f'{DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3', "rb") | |
| audio_path = f'{DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3' | |
| if not os.path.exists(f'{RESULT_PATH}/{VIDEO_NAME}'): | |
| os.mkdir(f'{RESULT_PATH}/{VIDEO_NAME}') | |
| if args.audio_file is not None: | |
| audio_file= open(args.audio_file, "rb") | |
| audio_path = args.audio_file | |
| # Instead of using the script_en variable directly, we'll use script_input | |
| srt_file_en = args.srt_file | |
| if srt_file_en is not None: | |
| srt = SRT_script.parse_from_srt_file(srt_file_en) | |
| else: | |
| # using whisper to perform speech-to-text and save it in <video name>_en.txt under RESULT PATH. | |
| srt_file_en = "{}/{}/{}_en.srt".format(RESULT_PATH, VIDEO_NAME, VIDEO_NAME) | |
| if not os.path.exists(srt_file_en): | |
| # use OpenAI API for transcribe | |
| # transcript = openai.Audio.transcribe("whisper-1", audio_file) | |
| # use local whisper model | |
| # model = whisper.load_model("base") # using base model in local machine (may use large model on our server) | |
| # transcript = model.transcribe(audio_path) | |
| # use stable-whisper | |
| model = stable_whisper.load_model('base') | |
| transcript = model.transcribe(audio_path, regroup = False) | |
| ( | |
| transcript | |
| .split_by_punctuation(['.', '。', '?']) | |
| .merge_by_gap(.15, max_words=3) | |
| .merge_by_punctuation([' ']) | |
| .split_by_punctuation(['.', '。', '?']) | |
| ) | |
| # transcript.to_srt_vtt(srt_file_en) | |
| transcript = transcript.to_dict() | |
| srt = SRT_script(transcript['segments']) # read segments to SRT class | |
| #Write SRT file | |
| # from whisper.utils import WriteSRT | |
| # with open(srt_file_en, 'w', encoding="utf-8") as f: | |
| # writer = WriteSRT(RESULT_PATH) | |
| # writer.write_result(transcript, f) | |
| else: | |
| srt = SRT_script.parse_from_srt_file(srt_file_en) | |
| # srt preprocess | |
| srt.form_whole_sentence() | |
| srt.spell_check_term() | |
| srt.correct_with_force_term() | |
| srt.write_srt_file_src(srt_file_en) | |
| script_input = srt.get_source_only() | |
| if not args.only_srt: | |
| from srt2ass import srt2ass | |
| assSub_en = srt2ass(srt_file_en, "default", "No", "Modest") | |
| print('ASS subtitle saved as: ' + assSub_en) | |
| # Split the video script by sentences and create chunks within the token limit | |
| def script_split(script_in, chunk_size = 1000): | |
| script_split = script_in.split('\n\n') | |
| script_arr = [] | |
| range_arr = [] | |
| start = 1 | |
| end = 0 | |
| script = "" | |
| for sentence in script_split: | |
| if len(script) + len(sentence) + 1 <= chunk_size: | |
| script += sentence + '\n\n' | |
| end+=1 | |
| else: | |
| range_arr.append((start, end)) | |
| start = end+1 | |
| end += 1 | |
| script_arr.append(script.strip()) | |
| script = sentence + '\n\n' | |
| if script.strip(): | |
| script_arr.append(script.strip()) | |
| range_arr.append((start, len(script_split)-1)) | |
| assert len(script_arr) == len(range_arr) | |
| return script_arr, range_arr | |
| script_arr, range_arr = script_split(script_input) | |
| def get_response(model_name): | |
| if model_name == "gpt-3.5-turbo" or model_name == "gpt-4": | |
| # print(s + "\n") | |
| response = openai.ChatCompletion.create( | |
| model=model_name, | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant that translates English to Chinese and have decent background in starcraft2."}, | |
| {"role": "system", "content": "Your translation has to keep the orginal format and be as accurate as possible."}, | |
| {"role": "system", "content": "There is no need for you to add any comments or notes."}, | |
| {"role": "user", "content": 'Translate the following English text to Chinese: "{}"'.format(s)} | |
| ], | |
| temperature=0.15 | |
| ) | |
| return response['choices'][0]['message']['content'].strip() | |
| if model_name == "text-davinci-003": | |
| prompt = f"Please help me translate this into Chinese:\n\n{s}\n\n" | |
| # print(prompt) | |
| response = openai.Completion.create( | |
| model=model_name, | |
| prompt=prompt, | |
| temperature=0.1, | |
| max_tokens=2000, | |
| top_p=1.0, | |
| frequency_penalty=0.0, | |
| presence_penalty=0.0 | |
| ) | |
| return response['choices'][0]['text'].strip() | |
| pass | |
| # Translate and save | |
| for s, range in tqdm(zip(script_arr, range_arr)): | |
| # using chatgpt model | |
| print(f"now translating sentences {range}") | |
| flag = True | |
| while flag: | |
| flag = False | |
| try: | |
| translate = get_response(model_name) | |
| except Exception as e: | |
| print("An error has occurred during translation:",e) | |
| print("Retrying...") | |
| time.sleep(30) | |
| flag = True | |
| srt.set_translation(translate, range, model_name) | |
| srt.check_len_and_split() | |
| srt.write_srt_file_translate(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt") | |
| srt.write_srt_file_bilingual(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_bi.srt") | |
| if not args.only_srt: | |
| assSub_zh = srt2ass(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt", "default", "No", "Modest") | |
| print('ASS subtitle saved as: ' + assSub_zh) | |
| if args.v: | |
| if args.only_srt: | |
| os.system(f'ffmpeg -i {video_path} -vf "subtitles={RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt" {RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}.mp4') | |
| else: | |
| os.system(f'ffmpeg -i {video_path} -vf "subtitles={RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.ass" {RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}.mp4') |