import gradio as gr import pandas as pd import requests from bs4 import BeautifulSoup from docx import Document import os from openai import OpenAI from groq import Groq import json from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import NoTranscriptFound from moviepy.editor import VideoFileClip from pytube import YouTube import os from google.cloud import storage from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.http import MediaIoBaseDownload from googleapiclient.http import MediaIoBaseUpload import io import time from urllib.parse import urlparse, parse_qs # 假设您的环境变量或Secret的名称是GOOGLE_APPLICATION_CREDENTIALS_JSON # credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") # credentials_dict = json.loads(credentials_json_string) # SCOPES = ['https://www.googleapis.com/auth/drive'] # credentials = service_account.Credentials.from_service_account_info( # credentials_dict, scopes=SCOPES) # service = build('drive', 'v3', credentials=credentials) # # 列出 Google Drive 上的前10個文件 # results = service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute() # items = results.get('files', []) # if not items: # print('No files found.') # else: # print("=====Google Drive 上的前10個文件=====") # print('Files:') # for item in items: # print(u'{0} ({1})'.format(item['name'], item['id'])) OUTPUT_PATH = 'videos' TRANSCRIPTS = [] CURRENT_INDEX = 0 VIDEO_ID = "" PASSWORD = os.getenv("PASSWORD") OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY) GROQ_API_KEY = os.getenv("GROQ_API_KEY") GROQ_CLIENT = Groq(api_key=GROQ_API_KEY) DRIVE_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") # 驗證 password def verify_password(password): if password == PASSWORD: return True else: raise gr.Error("密碼錯誤") # ====gcs==== def init_gcs_client(service_account_key_string): """使用服务账号密钥文件创建 GCS 客户端""" credentials_json_string = service_account_key_string credentials_dict = json.loads(credentials_json_string) credentials = service_account.Credentials.from_service_account_info(credentials_dict) gcs_client = storage.Client(credentials=credentials, project=credentials_dict['project_id']) return gcs_client def gcs_create_bucket_folder_if_not_exists(gcs_client, bucket_name, folder_name): """检查是否存在特定名称的文件夹(前缀),如果不存在则创建一个标记文件来模拟文件夹""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(folder_name) if not blob.exists(): blob.upload_from_string('', content_type='application/x-www-form-urlencoded;charset=UTF-8') print(f"GCS Folder '{folder_name}' created.") else: print(f"GCS Folder '{folder_name}' already exists.") def gcs_check_folder_exists(gcs_client, bucket_name, folder_name): """检查 GCS 存储桶中是否存在指定的文件夹""" bucket = gcs_client.bucket(bucket_name) blobs = list(bucket.list_blobs(prefix=folder_name)) return len(blobs) > 0 def gcs_check_file_exists(gcs_client, bucket_name, file_name): """ 检查 GCS 存储桶中是否存在指定的文件 file_name 格式:{folder_name}/{file_name} """ bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(file_name) return blob.exists() def upload_file_to_gcs(gcs_client, bucket_name, destination_blob_name, file_path): """上传文件到指定的 GCS 存储桶""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(file_path) print(f"File {file_path} uploaded to {destination_blob_name} in GCS.") def upload_file_to_gcs_with_json_string(gcs_client, bucket_name, destination_blob_name, json_string): """上传字符串到指定的 GCS 存储桶""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_string(json_string) print(f"JSON string uploaded to {destination_blob_name} in GCS.") def download_blob_to_string(gcs_client, bucket_name, source_blob_name): """从 GCS 下载文件内容到字符串""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) return blob.download_as_text() def make_blob_public(gcs_client, bucket_name, blob_name): """将指定的 GCS 对象设置为公共可读""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(blob_name) blob.make_public() print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}") def get_blob_public_url(gcs_client, bucket_name, blob_name): """获取指定 GCS 对象的公开 URL""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(blob_name) return blob.public_url def upload_img_and_get_public_url(gcs_client, bucket_name, file_name, file_path): """上传图片到 GCS 并获取其公开 URL""" # 上传图片 upload_file_to_gcs(gcs_client, bucket_name, file_name, file_path) # 将上传的图片设置为公开 make_blob_public(gcs_client, bucket_name, file_name) # 获取图片的公开 URL public_url = get_blob_public_url(gcs_client, bucket_name, file_name) print(f"Public URL for the uploaded image: {public_url}") return public_url def copy_all_files_from_drive_to_gcs(drive_service, gcs_client, drive_folder_id, bucket_name, gcs_folder_name): # Get all files from the folder query = f"'{drive_folder_id}' in parents and trashed = false" response = drive_service.files().list(q=query).execute() files = response.get('files', []) for file in files: # Copy each file to GCS file_id = file['id'] file_name = file['name'] gcs_destination_path = f"{gcs_folder_name}/{file_name}" copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path) def copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path): # Download file content from Drive request = drive_service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while not done: status, done = downloader.next_chunk() fh.seek(0) file_content = fh.getvalue() # Upload file content to GCS bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(gcs_destination_path) blob.upload_from_string(file_content) print(f"File {file_id} copied to GCS at {gcs_destination_path}.") # # ====drive====初始化 def init_drive_service(): credentials_json_string = DRIVE_KEY credentials_dict = json.loads(credentials_json_string) SCOPES = ['https://www.googleapis.com/auth/drive'] credentials = service_account.Credentials.from_service_account_info( credentials_dict, scopes=SCOPES) service = build('drive', 'v3', credentials=credentials) return service def create_folder_if_not_exists(service, folder_name, parent_id): print("检查是否存在特定名称的文件夹,如果不存在则创建") query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false" response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() folders = response.get('files', []) if not folders: # 文件夹不存在,创建新文件夹 file_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [parent_id] } folder = service.files().create(body=file_metadata, fields='id').execute() return folder.get('id') else: # 文件夹已存在 return folders[0]['id'] # 检查Google Drive上是否存在文件 def check_file_exists(service, folder_name, file_name): query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false" response = service.files().list(q=query).execute() files = response.get('files', []) return len(files) > 0, files[0]['id'] if files else None def upload_content_directly(service, file_name, folder_id, content): """ 直接将内容上传到Google Drive中的新文件。 """ if not file_name: raise ValueError("文件名不能为空") if not folder_id: raise ValueError("文件夹ID不能为空") if content is None: # 允许空字符串上传,但不允许None raise ValueError("内容不能为空") file_metadata = {'name': file_name, 'parents': [folder_id]} # 使用io.BytesIO为文本内容创建一个内存中的文件对象 try: with io.BytesIO(content.encode('utf-8')) as fh: media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True) print("==content==") print(content) print("==content==") print("==media==") print(media) print("==media==") # 执行上传 file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() return file.get('id') except Exception as e: print(f"上传文件时发生错误: {e}") raise # 重新抛出异常,调用者可以根据需要处理或忽略 def upload_file_directly(service, file_name, folder_id, file_path): # 上傳 .json to Google Drive file_metadata = {'name': file_name, 'parents': [folder_id]} media = MediaFileUpload(file_path, mimetype='application/json') file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() # return file.get('id') # 返回文件ID return True def upload_img_directly(service, file_name, folder_id, file_path): file_metadata = {'name': file_name, 'parents': [folder_id]} media = MediaFileUpload(file_path, mimetype='image/jpeg') file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() return file.get('id') # 返回文件ID def download_file_as_string(service, file_id): """ 从Google Drive下载文件并将其作为字符串返回。 """ request = service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() fh.seek(0) content = fh.read().decode('utf-8') return content def set_public_permission(service, file_id): service.permissions().create( fileId=file_id, body={"type": "anyone", "role": "reader"}, fields='id', ).execute() def update_file_on_drive(service, file_id, file_content): """ 更新Google Drive上的文件内容。 参数: - service: Google Drive API服务实例。 - file_id: 要更新的文件的ID。 - file_content: 新的文件内容,字符串格式。 """ # 将新的文件内容转换为字节流 fh = io.BytesIO(file_content.encode('utf-8')) media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True) # 更新文件 updated_file = service.files().update( fileId=file_id, media_body=media ).execute() print(f"文件已更新,文件ID: {updated_file['id']}") # ---- Main Functions ---- def process_file(password, file): verify_password(password) # 读取文件 if file.name.endswith('.csv'): df = pd.read_csv(file) text = df_to_text(df) elif file.name.endswith('.xlsx'): df = pd.read_excel(file) text = df_to_text(df) elif file.name.endswith('.docx'): text = docx_to_text(file) else: raise ValueError("Unsupported file type") df_string = df.to_string() # 宜蘭:移除@XX@符号 to | df_string = df_string.replace("@XX@", "|") # 根据上传的文件内容生成问题 questions = generate_questions(df_string) summary = generate_summarise(df_string) # 返回按钮文本和 DataFrame 字符串 return questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ summary, \ df_string def df_to_text(df): # 将 DataFrame 转换为纯文本 return df.to_string() def docx_to_text(file): # 将 Word 文档转换为纯文本 doc = Document(file) return "\n".join([para.text for para in doc.paragraphs]) def format_seconds_to_time(seconds): """将秒数格式化为 时:分:秒 的形式""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02}:{minutes:02}:{seconds:02}" def extract_youtube_id(url): parsed_url = urlparse(url) if "youtube.com" in parsed_url.netloc: # 对于标准链接,视频ID在查询参数'v'中 query_params = parse_qs(parsed_url.query) return query_params.get("v")[0] if "v" in query_params else None elif "youtu.be" in parsed_url.netloc: # 对于短链接,视频ID是路径的一部分 return parsed_url.path.lstrip('/') else: return None def get_transcript(video_id): languages = ['zh-TW', 'zh-Hant', 'zh', 'en'] # 優先順序列表 for language in languages: try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) return transcript # 成功獲取字幕,直接返回結果 except NoTranscriptFound: continue # 當前語言的字幕沒有找到,繼續嘗試下一個語言 return None # 所有嘗試都失敗,返回None def process_transcript_and_screenshots(video_id): print("====process_transcript_and_screenshots====") # Drive service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) # 逐字稿文件名 file_name = f'{video_id}_transcript.json' # 检查逐字稿是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: # 从YouTube获取逐字稿并上传 transcript = get_transcript(video_id) if transcript: print("成功獲取字幕") else: print("沒有找到字幕") transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) file_id = upload_content_directly(service, file_name, folder_id, transcript_text) print("逐字稿已上传到Google Drive") else: # 逐字稿已存在,下载逐字稿内容 print("逐字稿已存在于Google Drive中") transcript_text = download_file_as_string(service, file_id) transcript = json.loads(transcript_text) # 处理逐字稿中的每个条目,检查并上传截图 for entry in transcript: if 'img_file_id' not in entry: screenshot_path = screenshot_youtube_video(video_id, entry['start']) img_file_id = upload_img_directly(service, f"{video_id}_{entry['start']}.jpg", folder_id, screenshot_path) set_public_permission(service, img_file_id) entry['img_file_id'] = img_file_id print(f"截图已上传到Google Drive: {img_file_id}") # 更新逐字稿文件 updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) update_file_on_drive(service, file_id, updated_transcript_text) print("逐字稿已更新,包括截图链接") # init gcs client gcs_client = init_gcs_client(GCS_KEY) bucket_name = 'video_ai_assistant' # 检查 folder 是否存在 is_gcs_exists = gcs_check_folder_exists(gcs_client, bucket_name, video_id) if not is_gcs_exists: gcs_create_bucket_folder_if_not_exists(gcs_client, bucket_name, video_id) copy_all_files_from_drive_to_gcs(service, gcs_client, folder_id, bucket_name, video_id) print("Drive file 已上传到GCS") else: print("GCS folder:{video_id} 已存在") return transcript def process_transcript_and_screenshots_on_gcs(video_id): print("====process_transcript_and_screenshots_on_gcs====") # GCS gcs_client = init_gcs_client(GCS_KEY) bucket_name = 'video_ai_assistant' # 检查 folder 是否存在 # is_gcs_exists = gcs_check_folder_exists(gcs_client, bucket_name, video_id) # if not is_gcs_exists: # gcs_create_bucket_folder_if_not_exists(gcs_client, bucket_name, video_id) # print("GCS folder:{video_id} 已创建") # else: # print("GCS folder:{video_id} 已存在") # 逐字稿文件名 transcript_file_name = f'{video_id}_transcript.json' transcript_blob_name = f"{video_id}/{transcript_file_name}" # 检查逐字稿是否存在 is_transcript_exists = gcs_check_file_exists(gcs_client, bucket_name, transcript_blob_name) if not is_transcript_exists: # 从YouTube获取逐字稿并上传 transcript = get_transcript(video_id) if transcript: print("成功獲取字幕") else: print("沒有找到字幕") transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, transcript_text) else: # 逐字稿已存在,下载逐字稿内容 print("逐字稿已存在于GCS中") transcript_text = download_blob_to_string(gcs_client, bucket_name, transcript_blob_name) transcript = json.loads(transcript_text) # print("===確認其他衍生文件===") # source = "gcs" # get_questions(video_id, transcript_text, source) # get_video_id_summary(video_id, transcript_text, source) # get_mind_map(video_id, transcript_text, source) # print("===確認其他衍生文件 end ===") # 處理截圖 for entry in transcript: if 'img_file_id' not in entry: screenshot_path = screenshot_youtube_video(video_id, entry['start']) screenshot_blob_name = f"{video_id}/{video_id}_{entry['start']}.jpg" img_file_id = upload_img_and_get_public_url(gcs_client, bucket_name, screenshot_blob_name, screenshot_path) entry['img_file_id'] = img_file_id print(f"截图已上传到GCS: {img_file_id}") # 更新逐字稿文件 print("===更新逐字稿文件===") print(transcript) print("===更新逐字稿文件===") updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, updated_transcript_text) print("逐字稿已更新,包括截图链接") updated_transcript_json = json.loads(updated_transcript_text) return updated_transcript_json def process_youtube_link(password, link): verify_password(password) # 使用 YouTube API 获取逐字稿 # 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中 video_id = extract_youtube_id(link) global VIDEO_ID VIDEO_ID = video_id download_youtube_video(video_id, output_path=OUTPUT_PATH) try: # transcript = process_transcript_and_screenshots(video_id) transcript = process_transcript_and_screenshots_on_gcs(video_id) except Exception as e: error_msg = f" {video_id} 逐字稿錯誤: {str(e)}" print("===process_youtube_link error===") print(error_msg) raise gr.Error(error_msg) formatted_transcript = [] formatted_simple_transcript =[] screenshot_paths = [] for entry in transcript: start_time = format_seconds_to_time(entry['start']) end_time = format_seconds_to_time(entry['start'] + entry['duration']) embed_url = get_embedded_youtube_link(video_id, entry['start']) img_file_id = entry['img_file_id'] # img_file_id ="" # 先取消 Google Drive 的图片 # screenshot_path = f"https://lh3.googleusercontent.com/d/{img_file_id}=s4000" screenshot_path = img_file_id line = { "start_time": start_time, "end_time": end_time, "text": entry['text'], "embed_url": embed_url, "screenshot_path": screenshot_path } formatted_transcript.append(line) # formatted_simple_transcript 只要 start_time, end_time, text simple_line = { "start_time": start_time, "end_time": end_time, "text": entry['text'] } formatted_simple_transcript.append(simple_line) screenshot_paths.append(screenshot_path) global TRANSCRIPTS TRANSCRIPTS = formatted_transcript # 基于逐字稿生成其他所需的输出 source = "gcs" questions = get_questions(video_id, formatted_simple_transcript, source) formatted_transcript_json = json.dumps(formatted_transcript, ensure_ascii=False, indent=2) summary_json = get_video_id_summary(video_id, formatted_simple_transcript, source) summary = summary_json["summary"] html_content = format_transcript_to_html(formatted_transcript) simple_html_content = format_simple_transcript_to_html(formatted_simple_transcript) first_image = formatted_transcript[0]['screenshot_path'] # first_image = "https://www.nameslook.com/names/dfsadf-nameslook.png" first_text = formatted_transcript[0]['text'] mind_map_json = get_mind_map(video_id, formatted_simple_transcript, source) mind_map = mind_map_json["mind_map"] mind_map_html = get_mind_map_html(mind_map) # 确保返回与 UI 组件预期匹配的输出 return video_id, \ questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ formatted_transcript_json, \ summary, \ mind_map, \ mind_map_html, \ html_content, \ simple_html_content, \ first_image, \ first_text, def format_transcript_to_html(formatted_transcript): html_content = "" for entry in formatted_transcript: html_content += f"
{entry['text']}
" html_content += f"{entry['text']}
" return html_content def get_embedded_youtube_link(video_id, start_time): int_start_time = int(start_time) embed_url = f"https://www.youtube.com/embed/{video_id}?start={int_start_time}&autoplay=1" return embed_url def download_youtube_video(youtube_id, output_path=OUTPUT_PATH): # Construct the full YouTube URL youtube_url = f'https://www.youtube.com/watch?v={youtube_id}' # Create the output directory if it doesn't exist if not os.path.exists(output_path): os.makedirs(output_path) # Download the video yt = YouTube(youtube_url) video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() video_stream.download(output_path=output_path, filename=youtube_id+".mp4") print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4") def screenshot_youtube_video(youtube_id, snapshot_sec): video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4' file_name = f"{youtube_id}_{snapshot_sec}.jpg" with VideoFileClip(video_path) as video: screenshot_path = f'{OUTPUT_PATH}/{file_name}' video.save_frame(screenshot_path, snapshot_sec) return screenshot_path def process_web_link(link): # 抓取和解析网页内容 response = requests.get(link) soup = BeautifulSoup(response.content, 'html.parser') return soup.get_text() def get_mind_map(video_id, df_string, source): if source == "gcs": print("===get_mind_map on gcs===") gcs_client = init_gcs_client(GCS_KEY) bucket_name = 'video_ai_assistant' file_name = f'{video_id}_mind_map.json' blob_name = f"{video_id}/{file_name}" # 检查檔案是否存在 is_file_exists = gcs_check_file_exists(gcs_client, bucket_name, blob_name) if not is_file_exists: mind_map = generate_mind_map(df_string) mind_map_json = {"mind_map": str(mind_map)} mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, mind_map_text) print("mind_map已上傳到GCS") else: # mindmap已存在,下载内容 print("mind_map已存在于GCS中") mind_map_text = download_blob_to_string(gcs_client, bucket_name, blob_name) mind_map_json = json.loads(mind_map_text) elif source == "drive": print("===get_mind_map on drive===") service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f'{video_id}_mind_map.json' # 检查檔案是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: mind_map = generate_mind_map(df_string) mind_map_json = {"mind_map": str(mind_map)} mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) upload_content_directly(service, file_name, folder_id, mind_map_text) print("mind_map已上傳到Google Drive") else: # mindmap已存在,下载内容 print("mind_map已存在于Google Drive中") mind_map_text = download_file_as_string(service, file_id) mind_map_json = json.loads(mind_map_text) return mind_map_json def generate_mind_map(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW" user_content = f""" 請根據 {df_string} 文本建立 markdown 心智圖 注意:不需要前後文敘述,直接給出 markdown 文本即可 這對我很重要 """ messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) mind_map = response.choices[0].message.content.strip() print("=====mind_map=====") print(mind_map) print("=====mind_map=====") return mind_map def get_mind_map_html(mind_map): mind_map_markdown = mind_map.replace("```markdown", "").replace("```", "") mind_map_html = f"""