import gradio as gr import pandas as pd import requests from bs4 import BeautifulSoup from docx import Document import os from openai import OpenAI from groq import Groq from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import NoTranscriptFound import yt_dlp from moviepy.editor import VideoFileClip from pytube import YouTube import os import io import time import json from urllib.parse import urlparse, parse_qs from google.cloud import storage from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.http import MediaIoBaseDownload from googleapiclient.http import MediaIoBaseUpload from educational_material import EducationalMaterial from storage_service import GoogleCloudStorage is_env_local = os.getenv("IS_ENV_LOCAL", "false") == "true" print(f"is_env_local: {is_env_local}") if is_env_local: with open("local_config.json") as f: config = json.load(f) PASSWORD = config["PASSWORD"] GCS_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) DRIVE_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"]) OPEN_AI_KEY = config["OPEN_AI_KEY"] GROQ_API_KEY = config["GROQ_API_KEY"] JUTOR_CHAT_KEY = config["JUTOR_CHAT_KEY"] OUTPUT_PATH = config["OUTPUT_PATH"] else: PASSWORD = os.getenv("PASSWORD") GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") DRIVE_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") JUTOR_CHAT_KEY = os.getenv("JUTOR_CHAT_KEY") OUTPUT_PATH = 'videos' TRANSCRIPTS = [] CURRENT_INDEX = 0 VIDEO_ID = "" OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY) GROQ_CLIENT = Groq(api_key=GROQ_API_KEY) GCS_SERVICE = GoogleCloudStorage(GCS_KEY) GCS_CLIENT = GCS_SERVICE.client # 驗證 password def verify_password(password): if password == PASSWORD: return True else: raise gr.Error("密碼錯誤") # ====gcs==== def gcs_check_file_exists(gcs_client, bucket_name, file_name): """ 检查 GCS 存储桶中是否存在指定的文件 file_name 格式:{folder_name}/{file_name} """ bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(file_name) return blob.exists() def upload_file_to_gcs(gcs_client, bucket_name, destination_blob_name, file_path): """上传文件到指定的 GCS 存储桶""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(file_path) print(f"File {file_path} uploaded to {destination_blob_name} in GCS.") def upload_file_to_gcs_with_json_string(gcs_client, bucket_name, destination_blob_name, json_string): """上传字符串到指定的 GCS 存储桶""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_string(json_string) print(f"JSON string uploaded to {destination_blob_name} in GCS.") def download_blob_to_string(gcs_client, bucket_name, source_blob_name): """从 GCS 下载文件内容到字符串""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) return blob.download_as_text() def make_blob_public(gcs_client, bucket_name, blob_name): """将指定的 GCS 对象设置为公共可读""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(blob_name) blob.make_public() print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}") def get_blob_public_url(gcs_client, bucket_name, blob_name): """获取指定 GCS 对象的公开 URL""" bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(blob_name) return blob.public_url def upload_img_and_get_public_url(gcs_client, bucket_name, file_name, file_path): """上传图片到 GCS 并获取其公开 URL""" # 上传图片 upload_file_to_gcs(gcs_client, bucket_name, file_name, file_path) # 将上传的图片设置为公开 make_blob_public(gcs_client, bucket_name, file_name) # 获取图片的公开 URL public_url = get_blob_public_url(gcs_client, bucket_name, file_name) print(f"Public URL for the uploaded image: {public_url}") return public_url def copy_all_files_from_drive_to_gcs(drive_service, gcs_client, drive_folder_id, bucket_name, gcs_folder_name): # Get all files from the folder query = f"'{drive_folder_id}' in parents and trashed = false" response = drive_service.files().list(q=query).execute() files = response.get('files', []) for file in files: # Copy each file to GCS file_id = file['id'] file_name = file['name'] gcs_destination_path = f"{gcs_folder_name}/{file_name}" copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path) def copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path): # Download file content from Drive request = drive_service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while not done: status, done = downloader.next_chunk() fh.seek(0) file_content = fh.getvalue() # Upload file content to GCS bucket = gcs_client.bucket(bucket_name) blob = bucket.blob(gcs_destination_path) blob.upload_from_string(file_content) print(f"File {file_id} copied to GCS at {gcs_destination_path}.") # # ====drive====初始化 def init_drive_service(): credentials_json_string = DRIVE_KEY credentials_dict = json.loads(credentials_json_string) SCOPES = ['https://www.googleapis.com/auth/drive'] credentials = service_account.Credentials.from_service_account_info( credentials_dict, scopes=SCOPES) service = build('drive', 'v3', credentials=credentials) return service def create_folder_if_not_exists(service, folder_name, parent_id): print("检查是否存在特定名称的文件夹,如果不存在则创建") query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false" response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() folders = response.get('files', []) if not folders: # 文件夹不存在,创建新文件夹 file_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [parent_id] } folder = service.files().create(body=file_metadata, fields='id').execute() return folder.get('id') else: # 文件夹已存在 return folders[0]['id'] # 检查Google Drive上是否存在文件 def check_file_exists(service, folder_name, file_name): query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false" response = service.files().list(q=query).execute() files = response.get('files', []) return len(files) > 0, files[0]['id'] if files else None def upload_content_directly(service, file_name, folder_id, content): """ 直接将内容上传到Google Drive中的新文件。 """ if not file_name: raise ValueError("文件名不能为空") if not folder_id: raise ValueError("文件夹ID不能为空") if content is None: # 允许空字符串上传,但不允许None raise ValueError("内容不能为空") file_metadata = {'name': file_name, 'parents': [folder_id]} # 使用io.BytesIO为文本内容创建一个内存中的文件对象 try: with io.BytesIO(content.encode('utf-8')) as fh: media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True) print("==content==") print(content) print("==content==") print("==media==") print(media) print("==media==") # 执行上传 file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() return file.get('id') except Exception as e: print(f"上传文件时发生错误: {e}") raise # 重新抛出异常,调用者可以根据需要处理或忽略 def upload_file_directly(service, file_name, folder_id, file_path): # 上傳 .json to Google Drive file_metadata = {'name': file_name, 'parents': [folder_id]} media = MediaFileUpload(file_path, mimetype='application/json') file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() # return file.get('id') # 返回文件ID return True def upload_img_directly(service, file_name, folder_id, file_path): file_metadata = {'name': file_name, 'parents': [folder_id]} media = MediaFileUpload(file_path, mimetype='image/jpeg') file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() return file.get('id') # 返回文件ID def download_file_as_string(service, file_id): """ 从Google Drive下载文件并将其作为字符串返回。 """ request = service.files().get_media(fileId=file_id) fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() fh.seek(0) content = fh.read().decode('utf-8') return content def set_public_permission(service, file_id): service.permissions().create( fileId=file_id, body={"type": "anyone", "role": "reader"}, fields='id', ).execute() def update_file_on_drive(service, file_id, file_content): """ 更新Google Drive上的文件内容。 参数: - service: Google Drive API服务实例。 - file_id: 要更新的文件的ID。 - file_content: 新的文件内容,字符串格式。 """ # 将新的文件内容转换为字节流 fh = io.BytesIO(file_content.encode('utf-8')) media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True) # 更新文件 updated_file = service.files().update( fileId=file_id, media_body=media ).execute() print(f"文件已更新,文件ID: {updated_file['id']}") # ---- Text file ---- def process_file(password, file): verify_password(password) # 读取文件 if file.name.endswith('.csv'): df = pd.read_csv(file) text = df_to_text(df) elif file.name.endswith('.xlsx'): df = pd.read_excel(file) text = df_to_text(df) elif file.name.endswith('.docx'): text = docx_to_text(file) else: raise ValueError("Unsupported file type") df_string = df.to_string() # 宜蘭:移除@XX@符号 to | df_string = df_string.replace("@XX@", "|") # 根据上传的文件内容生成问题 questions = generate_questions(df_string) summary = generate_summarise(df_string) # 返回按钮文本和 DataFrame 字符串 return questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ summary, \ df_string def df_to_text(df): # 将 DataFrame 转换为纯文本 return df.to_string() def docx_to_text(file): # 将 Word 文档转换为纯文本 doc = Document(file) return "\n".join([para.text for para in doc.paragraphs]) # ---- YouTube link ---- def format_seconds_to_time(seconds): """将秒数格式化为 时:分:秒 的形式""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02}:{minutes:02}:{seconds:02}" def extract_youtube_id(url): parsed_url = urlparse(url) if "youtube.com" in parsed_url.netloc: # 对于标准链接,视频ID在查询参数'v'中 query_params = parse_qs(parsed_url.query) return query_params.get("v")[0] if "v" in query_params else None elif "youtu.be" in parsed_url.netloc: # 对于短链接,视频ID是路径的一部分 return parsed_url.path.lstrip('/') else: return None def get_transcript(video_id): languages = ['zh-TW', 'zh-Hant', 'zh', 'en'] # 優先順序列表 for language in languages: try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) return transcript # 成功獲取字幕,直接返回結果 except NoTranscriptFound: continue # 當前語言的字幕沒有找到,繼續嘗試下一個語言 return None # 所有嘗試都失敗,返回None def generate_transcription(video_id): youtube_url = f'https://www.youtube.com/watch?v={video_id}' codec_name = "mp3" outtmpl = f"{OUTPUT_PATH}/{video_id}.%(ext)s" ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': codec_name, 'preferredquality': '192' }], 'outtmpl': outtmpl, } print("===download video mp3===") with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) audio_path = f"{OUTPUT_PATH}/{video_id}.{codec_name}" print("===transcription by open ai===") with open(audio_path, "rb") as audio_file: srt_content = OPEN_AI_CLIENT.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="verbose_json", timestamp_granularities=["segment"], prompt="如果逐字稿有中文,請使用繁體中文 zh-TW", ) # get segments segments = srt_content.segments transcription = [ { "text": item["text"], "start": int(item["start"]), "duration": int(item["end"] - item["start"]) } for item in segments ] return transcription def process_transcript_and_screenshots(video_id): print("====process_transcript_and_screenshots====") # Drive service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) # 逐字稿文件名 file_name = f'{video_id}_transcript.json' # 检查逐字稿是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: # 从YouTube获取逐字稿并上传 transcript = get_transcript(video_id) if transcript: print("成功獲取字幕") else: print("沒有找到字幕") transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) file_id = upload_content_directly(service, file_name, folder_id, transcript_text) print("逐字稿已上传到Google Drive") else: # 逐字稿已存在,下载逐字稿内容 print("逐字稿已存在于Google Drive中") transcript_text = download_file_as_string(service, file_id) transcript = json.loads(transcript_text) # 处理逐字稿中的每个条目,检查并上传截图 for entry in transcript: if 'img_file_id' not in entry: screenshot_path = screenshot_youtube_video(video_id, entry['start']) img_file_id = upload_img_directly(service, f"{video_id}_{entry['start']}.jpg", folder_id, screenshot_path) set_public_permission(service, img_file_id) entry['img_file_id'] = img_file_id print(f"截图已上传到Google Drive: {img_file_id}") # 更新逐字稿文件 updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) update_file_on_drive(service, file_id, updated_transcript_text) print("逐字稿已更新,包括截图链接") return transcript def process_transcript_and_screenshots_on_gcs(video_id): print("====process_transcript_and_screenshots_on_gcs====") # GCS gcs_client = GCS_CLIENT bucket_name = 'video_ai_assistant' # 逐字稿文件名 transcript_file_name = f'{video_id}_transcript.json' transcript_blob_name = f"{video_id}/{transcript_file_name}" # 检查逐字稿是否存在 is_transcript_exists = GCS_SERVICE.check_file_exists(bucket_name, transcript_blob_name) if not is_transcript_exists: # 从YouTube获取逐字稿并上传 try: transcript = get_transcript(video_id) except: # call open ai whisper print("===call open ai whisper===") transcript = generate_transcription(video_id) if transcript: print("成功獲取字幕") else: print("沒有找到字幕") transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, transcript_text) else: # 逐字稿已存在,下载逐字稿内容 print("逐字稿已存在于GCS中") transcript_text = download_blob_to_string(gcs_client, bucket_name, transcript_blob_name) transcript = json.loads(transcript_text) # print("===確認其他衍生文件===") # source = "gcs" # get_questions(video_id, transcript_text, source) # get_video_id_summary(video_id, transcript_text, source) # get_mind_map(video_id, transcript_text, source) # print("===確認其他衍生文件 end ===") # 處理截圖 for entry in transcript: if 'img_file_id' not in entry: # 檢查 OUTPUT_PATH 是否存在 video_id.mp4 video_path = f'{OUTPUT_PATH}/{video_id}.mp4' if not os.path.exists(video_path): # try 5 times 如果都失敗就 raise for i in range(5): try: download_youtube_video(video_id) break except Exception as e: if i == 4: raise gr.Error(f"下载视频失败: {str(e)}") time.sleep(5) # 截图 screenshot_path = screenshot_youtube_video(video_id, entry['start']) screenshot_blob_name = f"{video_id}/{video_id}_{entry['start']}.jpg" img_file_id = upload_img_and_get_public_url(gcs_client, bucket_name, screenshot_blob_name, screenshot_path) entry['img_file_id'] = img_file_id print(f"截图已上传到GCS: {img_file_id}") # 更新逐字稿文件 print("===更新逐字稿文件===") print(transcript) print("===更新逐字稿文件===") updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, updated_transcript_text) print("逐字稿已更新,包括截图链接") updated_transcript_json = json.loads(updated_transcript_text) return updated_transcript_json def process_youtube_link(password, link): verify_password(password) # 使用 YouTube API 获取逐字稿 # 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中 video_id = extract_youtube_id(link) global VIDEO_ID VIDEO_ID = video_id try: # transcript = process_transcript_and_screenshots(video_id) transcript = process_transcript_and_screenshots_on_gcs(video_id) except Exception as e: error_msg = f" {video_id} 逐字稿錯誤: {str(e)}" print("===process_youtube_link error===") print(error_msg) raise gr.Error(error_msg) formatted_transcript = [] formatted_simple_transcript =[] screenshot_paths = [] for entry in transcript: start_time = format_seconds_to_time(entry['start']) end_time = format_seconds_to_time(entry['start'] + entry['duration']) embed_url = get_embedded_youtube_link(video_id, entry['start']) img_file_id = entry['img_file_id'] # img_file_id ="" # 先取消 Google Drive 的图片 # screenshot_path = f"https://lh3.googleusercontent.com/d/{img_file_id}=s4000" screenshot_path = img_file_id line = { "start_time": start_time, "end_time": end_time, "text": entry['text'], "embed_url": embed_url, "screenshot_path": screenshot_path } formatted_transcript.append(line) # formatted_simple_transcript 只要 start_time, end_time, text simple_line = { "start_time": start_time, "end_time": end_time, "text": entry['text'] } formatted_simple_transcript.append(simple_line) screenshot_paths.append(screenshot_path) global TRANSCRIPTS TRANSCRIPTS = formatted_transcript # 基于逐字稿生成其他所需的输出 source = "gcs" questions = get_questions(video_id, formatted_simple_transcript, source) formatted_transcript_json = json.dumps(formatted_transcript, ensure_ascii=False, indent=2) summary_json = get_video_id_summary(video_id, formatted_simple_transcript, source) summary = summary_json["summary"] html_content = format_transcript_to_html(formatted_transcript) simple_html_content = format_simple_transcript_to_html(formatted_simple_transcript) first_image = formatted_transcript[0]['screenshot_path'] # first_image = "https://www.nameslook.com/names/dfsadf-nameslook.png" first_text = formatted_transcript[0]['text'] mind_map_json = get_mind_map(video_id, formatted_simple_transcript, source) mind_map = mind_map_json["mind_map"] mind_map_html = get_mind_map_html(mind_map) reading_passage_json = get_reading_passage(video_id, formatted_simple_transcript, source) reading_passage = reading_passage_json["reading_passage"] # 确保返回与 UI 组件预期匹配的输出 return video_id, \ questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ formatted_transcript_json, \ summary, \ mind_map, \ mind_map_html, \ html_content, \ simple_html_content, \ first_image, \ first_text, \ reading_passage def format_transcript_to_html(formatted_transcript): html_content = "" for entry in formatted_transcript: html_content += f"

{entry['start_time']} - {entry['end_time']}

" html_content += f"

{entry['text']}

" html_content += f"" return html_content def format_simple_transcript_to_html(formatted_transcript): html_content = "" for entry in formatted_transcript: html_content += f"

{entry['start_time']} - {entry['end_time']}

" html_content += f"

{entry['text']}

" return html_content def get_embedded_youtube_link(video_id, start_time): int_start_time = int(start_time) embed_url = f"https://www.youtube.com/embed/{video_id}?start={int_start_time}&autoplay=1" return embed_url def download_youtube_video(youtube_id, output_path=OUTPUT_PATH): # Construct the full YouTube URL youtube_url = f'https://www.youtube.com/watch?v={youtube_id}' # Create the output directory if it doesn't exist if not os.path.exists(output_path): os.makedirs(output_path) # Download the video yt = YouTube(youtube_url) video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() video_stream.download(output_path=output_path, filename=youtube_id+".mp4") print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4") def screenshot_youtube_video(youtube_id, snapshot_sec): video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4' file_name = f"{youtube_id}_{snapshot_sec}.jpg" with VideoFileClip(video_path) as video: screenshot_path = f'{OUTPUT_PATH}/{file_name}' video.save_frame(screenshot_path, snapshot_sec) return screenshot_path # ---- Web ---- def process_web_link(link): # 抓取和解析网页内容 response = requests.get(link) soup = BeautifulSoup(response.content, 'html.parser') return soup.get_text() # ---- LLM Generator ---- def get_reading_passage(video_id, df_string, source): if source == "gcs": print("===get_reading_passage on gcs===") gcs_client = GCS_CLIENT bucket_name = 'video_ai_assistant' file_name = f'{video_id}_reading_passage.json' blob_name = f"{video_id}/{file_name}" # 检查 reading_passage 是否存在 is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) if not is_file_exists: reading_passage = generate_reading_passage(df_string) reading_passage_json = {"reading_passage": str(reading_passage)} reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, reading_passage_text) print("reading_passage已上传到GCS") else: # reading_passage已存在,下载内容 print("reading_passage已存在于GCS中") reading_passage_text = download_blob_to_string(gcs_client, bucket_name, blob_name) reading_passage_json = json.loads(reading_passage_text) elif source == "drive": print("===get_reading_passage on drive===") service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f'{video_id}_reading_passage.json' # 检查 reading_passage 是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: reading_passage = generate_reading_passage(df_string) reading_passage_json = {"reading_passage": str(reading_passage)} reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2) upload_content_directly(service, file_name, folder_id, reading_passage_text) print("reading_passage已上傳到Google Drive") else: # reading_passage已存在,下载内容 print("reading_passage已存在于Google Drive中") reading_passage_text = download_file_as_string(service, file_id) return reading_passage_json def generate_reading_passage(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW" user_content = f""" 請根據 {df_string} 文本自行判斷資料的種類 幫我組合成 Reading Passage 並潤稿讓文句通順 請一定要使用繁體中文 zh-TW,並用台灣人的口語 產生的結果不要前後文解釋,只需要專注提供 Reading Passage """ messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) reading_passage = response.choices[0].message.content.strip() print("=====reading_passage=====") print(reading_passage) print("=====reading_passage=====") return reading_passage def get_mind_map(video_id, df_string, source): if source == "gcs": print("===get_mind_map on gcs===") gcs_client = GCS_CLIENT bucket_name = 'video_ai_assistant' file_name = f'{video_id}_mind_map.json' blob_name = f"{video_id}/{file_name}" # 检查檔案是否存在 is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) if not is_file_exists: mind_map = generate_mind_map(df_string) mind_map_json = {"mind_map": str(mind_map)} mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, mind_map_text) print("mind_map已上傳到GCS") else: # mindmap已存在,下载内容 print("mind_map已存在于GCS中") mind_map_text = download_blob_to_string(gcs_client, bucket_name, blob_name) mind_map_json = json.loads(mind_map_text) elif source == "drive": print("===get_mind_map on drive===") service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f'{video_id}_mind_map.json' # 检查檔案是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: mind_map = generate_mind_map(df_string) mind_map_json = {"mind_map": str(mind_map)} mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2) upload_content_directly(service, file_name, folder_id, mind_map_text) print("mind_map已上傳到Google Drive") else: # mindmap已存在,下载内容 print("mind_map已存在于Google Drive中") mind_map_text = download_file_as_string(service, file_id) mind_map_json = json.loads(mind_map_text) return mind_map_json def generate_mind_map(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW" user_content = f""" 請根據 {df_string} 文本建立 markdown 心智圖 注意:不需要前後文敘述,直接給出 markdown 文本即可 這對我很重要 """ messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) mind_map = response.choices[0].message.content.strip() print("=====mind_map=====") print(mind_map) print("=====mind_map=====") return mind_map def get_mind_map_html(mind_map): mind_map_markdown = mind_map.replace("```markdown", "").replace("```", "") mind_map_html = f"""
""" return mind_map_html def get_video_id_summary(video_id, df_string, source): if source == "gcs": print("===get_video_id_summary on gcs===") gcs_client = GCS_CLIENT bucket_name = 'video_ai_assistant' file_name = f'{video_id}_summary.json' summary_file_blob_name = f"{video_id}/{file_name}" # 检查 summary_file 是否存在 is_summary_file_exists = GCS_SERVICE.check_file_exists(bucket_name, summary_file_blob_name) if not is_summary_file_exists: summary = generate_summarise(df_string) summary_json = {"summary": str(summary)} summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, summary_file_blob_name, summary_text) print("summary已上传到GCS") else: # summary已存在,下载内容 print("summary已存在于GCS中") summary_text = download_blob_to_string(gcs_client, bucket_name, summary_file_blob_name) summary_json = json.loads(summary_text) elif source == "drive": print("===get_video_id_summary===") service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f'{video_id}_summary.json' # 检查逐字稿是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: summary = generate_summarise(df_string) summary_json = {"summary": str(summary)} summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) try: upload_content_directly(service, file_name, folder_id, summary_text) print("summary已上傳到Google Drive") except Exception as e: error_msg = f" {video_id} 摘要錯誤: {str(e)}" print("===get_video_id_summary error===") print(error_msg) print("===get_video_id_summary error===") else: # 逐字稿已存在,下载逐字稿内容 print("summary已存在Google Drive中") summary_text = download_file_as_string(service, file_id) summary_json = json.loads(summary_text) return summary_json def generate_summarise(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW" user_content = f""" 請根據 {df_string},判斷這份文本 如果是資料類型,請提估欄位敘述、資料樣態與資料分析,告訴學生這張表的意義,以及可能的結論與對應方式 如果是影片類型,請提估影片內容,告訴學生這部影片的意義, 整體摘要在一百字以內 小範圍切出不同段落的相對應時間軸的重點摘要,最多不超過五段 注意不要遺漏任何一段時間軸的內容 格式為 【start - end】: 摘要 以及可能的結論與結尾延伸小問題提供學生作反思 整體格式為: 🗂️ 1. 內容類型:? 📚 2. 整體摘要 🔖 3. 重點概念 🔑 4. 關鍵時刻 💡 5. 為什麼我們要學這個? ❓ 6. 延伸小問題 """ # 🗂️ 1. 內容類型:? # 📚 2. 整體摘要 # 🔖 3. 條列式重點 # 🔑 4. 關鍵時刻(段落摘要) # 💡 5. 結論反思(為什麼我們要學這個?) # ❓ 6. 延伸小問題 messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] request_payload = { "model": "gpt-4-turbo-preview", "messages": messages, "max_tokens": 4000, } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) df_summarise = response.choices[0].message.content.strip() print("=====df_summarise=====") print(df_summarise) print("=====df_summarise=====") return df_summarise def generate_questions(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,並用既有資料為本質猜測用戶可能會問的問題,使用 zh-TW" user_content = f"請根據 {df_string} 生成三個問題,並用 JSON 格式返回 questions:[q1的敘述text, q2的敘述text, q3的敘述text]" messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] response_format = { "type": "json_object" } print("=====messages=====") print(messages) print("=====messages=====") request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, "response_format": response_format } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) questions = json.loads(response.choices[0].message.content)["questions"] print("=====json_response=====") print(questions) print("=====json_response=====") return questions def get_questions(video_id, df_string, source="gcs"): if source == "gcs": # 去 gcs 確認是有有 video_id_questions.json print("===get_questions on gcs===") gcs_client = GCS_CLIENT bucket_name = 'video_ai_assistant' file_name = f'{video_id}_questions.json' blob_name = f"{video_id}/{file_name}" # 检查檔案是否存在 is_questions_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name) if not is_questions_exists: questions = generate_questions(df_string) questions_text = json.dumps(questions, ensure_ascii=False, indent=2) upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, questions_text) print("questions已上傳到GCS") else: # 逐字稿已存在,下载逐字稿内容 print("questions已存在于GCS中") questions_text = download_blob_to_string(gcs_client, bucket_name, blob_name) questions = json.loads(questions_text) elif source == "drive": # 去 g drive 確認是有有 video_id_questions.json print("===get_questions===") service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f'{video_id}_questions.json' # 检查檔案是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: questions = generate_questions(df_string) questions_text = json.dumps(questions, ensure_ascii=False, indent=2) upload_content_directly(service, file_name, folder_id, questions_text) print("questions已上傳到Google Drive") else: # 逐字稿已存在,下载逐字稿内容 print("questions已存在于Google Drive中") questions_text = download_file_as_string(service, file_id) questions = json.loads(questions_text) q1 = questions[0] if len(questions) > 0 else "" q2 = questions[1] if len(questions) > 1 else "" q3 = questions[2] if len(questions) > 2 else "" print("=====get_questions=====") print(f"q1: {q1}") print(f"q2: {q2}") print(f"q3: {q3}") print("=====get_questions=====") return q1, q2, q3 def change_questions(password, df_string): verify_password(password) questions = generate_questions(df_string) q1 = questions[0] if len(questions) > 0 else "" q2 = questions[1] if len(questions) > 1 else "" q3 = questions[2] if len(questions) > 2 else "" print("=====get_questions=====") print(f"q1: {q1}") print(f"q2: {q2}") print(f"q3: {q3}") print("=====get_questions=====") return q1, q2, q3 # AI 生成教學素材 def on_generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type): verify_password(password) material = EducationalMaterial(df_string, topic, grade, level, specific_feature, content_type) prompt = material.generate_content_prompt() user_content = material.build_user_content() messages = material.build_messages(user_content) ai_model_name = "gpt-4-1106-preview" request_payload = { "model": ai_model_name, "messages": messages, "max_tokens": 4000 # 举例,实际上您可能需要更详细的配置 } ai_content = material.send_ai_request(OPEN_AI_CLIENT, request_payload) return ai_content, prompt, prompt def generate_exam_fine_tune_result(password, exam_result_prompt , df_string_output, exam_result, exam_result_fine_tune_prompt): verify_password(password) material = EducationalMaterial(df_string_output, "", "", "", "", "") user_content = material.build_fine_tune_user_content(exam_result_prompt, exam_result, exam_result_fine_tune_prompt) messages = material.build_messages(user_content) ai_model_name = "gpt-4-1106-preview" request_payload = { "model": ai_model_name, "messages": messages, "max_tokens": 4000 # 举例,实际上您可能需要更详细的配置 } ai_content = material.send_ai_request(OPEN_AI_CLIENT, request_payload) return ai_content # ---- Chatbot ---- def respond(password, user_message, data, chat_history, socratic_mode=False): verify_password(password) print("=== 變數:user_message ===") print(user_message) print("=== 變數:chat_history ===") print(chat_history) data_json = json.loads(data) for entry in data_json: entry.pop('embed_url', None) # Remove 'embed_url' if it exists entry.pop('screenshot_path', None) if socratic_mode: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 不要直接給予答案,讓學生自己思考 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 最後,在你回答的開頭標註【蘇格拉底助教】 """ else: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 """ messages = [ {"role": "system", "content": sys_content} ] # if chat_history is not none, append role, content to messages # chat_history = [(user, assistant), (user, assistant), ...] # In the list, first one is user, then assistant if chat_history is not None: # 如果超過10則訊息,只保留最後10則訊息 if len(chat_history) > 10: chat_history = chat_history[-10:] for chat in chat_history: old_messages = [ {"role": "user", "content": chat[0]}, {"role": "assistant", "content": chat[1]} ] messages += old_messages else: pass messages.append({"role": "user", "content": user_message}) request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000 # 設定一個較大的值,可根據需要調整 } response = OPEN_AI_CLIENT.chat.completions.create(**request_payload) response_text = response.choices[0].message.content.strip() # 更新聊天历史 new_chat_history = (user_message, response_text) if chat_history is None: chat_history = [new_chat_history] else: chat_history.append(new_chat_history) # 返回聊天历史和空字符串清空输入框 return "", chat_history def respond_with_jutor_chat(password, user_message, data, chat_history, socratic_mode=False): verify_password(password) data_json = json.loads(data) for entry in data_json: entry.pop('embed_url', None) # Remove 'embed_url' if it exists entry.pop('screenshot_path', None) if socratic_mode: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 不要直接給予答案,讓學生自己思考 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 最後,在你回答的開頭標註【蘇格拉底助教】 """ else: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 """ messages = [ {"role": "system", "content": sys_content} ] # if chat_history is not none, append role, content to messages # chat_history = [(user, assistant), (user, assistant), ...] # In the list, first one is user, then assistant if chat_history is not None: # 如果超過10則訊息,只保留最後10則訊息 if len(chat_history) > 10: chat_history = chat_history[-10:] for chat in chat_history: old_messages = [ {"role": "user", "content": chat[0]}, {"role": "assistant", "content": chat[1]} ] messages += old_messages else: pass messages.append({"role": "user", "content": user_message}) api_endpoint = "https://ci-live-feat-video-ai-dot-junyiacademy.appspot.com/api/v2/jutor/hf-chat" headers = { "Content-Type": "application/json", "x-api-key": JUTOR_CHAT_KEY, } data = { "data": { "messages": messages, "max_tokens": 512, "temperature": 0.9, "model": "gpt-4-1106-preview", "stream": False, } } response = requests.post(api_endpoint, headers=headers, data=json.dumps(data)) if response.status_code == 200: # 处理响应数据 response_data = response.json() prompt = response_data['data']['choices'][0]['message']['content'].strip() # 更新聊天历史 new_chat_history = (user_message, prompt) if chat_history is None: chat_history = [new_chat_history] else: chat_history.append(new_chat_history) # 返回聊天历史和空字符串清空输入框 return "", chat_history else: # 处理错误情况 print(f"Error: {response.status_code}") return "请求失败,请稍后再试!", chat_history def chat_with_groq(password, user_message, data, chat_history, socratic_mode=False): verify_password(password) print("=== 變數:user_message ===") print(user_message) print("=== 變數:chat_history ===") print(chat_history) data_json = json.loads(data) for entry in data_json: entry.pop('embed_url', None) # Remove 'embed_url' if it exists entry.pop('screenshot_path', None) if socratic_mode: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 不要直接給予答案,讓學生自己思考 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 最後,在你回答的開頭標註【蘇格拉底助教】 """ else: sys_content = f""" 你是一個擅長資料分析跟影片教學的老師,user 為學生 請用 {data} 為資料文本,自行判斷資料的種類, 並進行對話,使用 zh-TW 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 """ messages = [ {"role": "system", "content": sys_content} ] # if chat_history is not none, append role, content to messages # chat_history = [(user, assistant), (user, assistant), ...] # In the list, first one is user, then assistant if chat_history is not None: # 如果超過10則訊息,只保留最後10則訊息 if len(chat_history) > 10: chat_history = chat_history[-10:] for chat in chat_history: old_messages = [ {"role": "user", "content": chat[0]}, {"role": "assistant", "content": chat[1]} ] messages += old_messages else: pass messages.append({"role": "user", "content": user_message}) request_payload = { "model": "mixtral-8x7b-32768", "messages": messages, "max_tokens": 4000 # 設定一個較大的值,可根據需要調整 } response = GROQ_CLIENT.chat.completions.create(**request_payload) response_text = response.choices[0].message.content.strip() # 更新聊天历史 new_chat_history = (user_message, response_text) if chat_history is None: chat_history = [new_chat_history] else: chat_history.append(new_chat_history) # 返回聊天历史和空字符串清空输入框 return "", chat_history def chat_with_youtube_transcript(password, youtube_id, thread_id, trascript, user_message, chat_history, socratic_mode=False): verify_password(password) # 先計算 user_message 是否超過 500 個字 if len(user_message) > 1500: error_msg = "你的訊息太長了,請縮短訊息長度至五百字以內" raise gr.Error(error_msg) try: assistant_id = "asst_kmvZLNkDUYaNkMNtZEAYxyPq" client = OPEN_AI_CLIENT # 從 file 拿逐字稿資料 # instructions = f""" # 你是一個擅長資料分析跟影片教學的老師,user 為學生 # 請根據 assistant beta 的上傳資料 # 如果 file 內有找到 file.content["{youtube_id}"] 為資料文本,自行判斷資料的種類, # 如果沒有資料,請告訴用戶沒有逐字稿資料,但仍然可以進行對話,使用台灣人的口與表達,及繁體中文 zh-TW # 請嚴格執行,只根據 file.content["{youtube_id}"] 為資料文本,沒有就是沒有資料,不要引用其他資料 # 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 # socratic_mode = {socratic_mode} # 如果 socratic_mode = True, # - 請用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 # - 不要直接給予答案,讓學生自己思考 # - 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 # - 在你回答的開頭標註【蘇格拉底助教:{youtube_id} 】 # 如果 socratic_mode = False, # - 直接回答學生問題 # - 在你回答的開頭標註【一般學習精靈:{youtube_id} 】 # 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 # 或者你可以反問學生一些問題,幫助學生更好的理解資料 # 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 # 最後只要是參考逐字稿資料,請在回答的最後標註【參考資料:(分):(秒)】 # """ # 直接安排逐字稿資料 in instructions trascript_json = json.loads(trascript) # 移除 embed_url, screenshot_path for entry in trascript_json: entry.pop('embed_url', None) entry.pop('screenshot_path', None) trascript_text = json.dumps(trascript_json, ensure_ascii=False, indent=2) instructions = f""" 逐字稿資料:{trascript_text} ------------------------------------- 你是一個擅長資料分析跟影片教學的老師,user 為學生 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 socratic_mode = {socratic_mode} 如果 socratic_mode = True, - 請用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 - 不要直接給予答案,讓學生自己思考 - 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 - 在你回答的開頭標註【蘇格拉底助教:{youtube_id} 】 如果 socratic_mode = False, - 直接回答學生問題 - 在你回答的開頭標註【一般學習精靈:{youtube_id} 】 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 或者你可以反問學生一些問題,幫助學生更好的理解資料 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 最後只要是參考逐字稿資料,請在回答的最後標註【參考資料:(分):(秒)】 """ # 创建线程 if not thread_id: thread = client.beta.threads.create() thread_id = thread.id else: thread = client.beta.threads.retrieve(thread_id) # 向线程添加用户的消息 client.beta.threads.messages.create( thread_id=thread.id, role="user", content=user_message ) # 运行助手,生成响应 run = client.beta.threads.runs.create( thread_id=thread.id, assistant_id=assistant_id, instructions=instructions, ) # 等待助手响应,设定最大等待时间为 30 秒 run_status = poll_run_status(run.id, thread.id, timeout=30) # 获取助手的响应消息 if run_status == "completed": messages = client.beta.threads.messages.list(thread_id=thread.id) # [MessageContentText(text=Text(annotations=[], value='您好!有什麼我可以幫助您的嗎?如果有任何問題或需要指導,請隨時告訴我!'), type='text')] response_text = messages.data[0].content[0].text.value else: response_text = "學習精靈有點累,請稍後再試!" # 更新聊天历史 new_chat_history = (user_message, response_text) if chat_history is None: chat_history = [new_chat_history] else: chat_history.append(new_chat_history) except Exception as e: print(f"Error: {e}") raise gr.Error(f"Error: {e}") # 返回聊天历史和空字符串清空输入框 return "", chat_history, thread.id def process_open_ai_audio_to_chatbot(password, audio_url): verify_password(password) if audio_url: with open(audio_url, "rb") as audio_file: file_size = os.path.getsize(audio_url) if file_size > 2000000: raise gr.Error("檔案大小超過,請不要超過 60秒") else: response = OPEN_AI_CLIENT.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="text" ) # response 拆解 dict print("=== response ===") print(response) print("=== response ===") else: response = "" return response def poll_run_status(run_id, thread_id, timeout=600, poll_interval=5): """ Polls the status of a Run and handles different statuses appropriately. :param run_id: The ID of the Run to poll. :param thread_id: The ID of the Thread associated with the Run. :param timeout: Maximum time to wait for the Run to complete, in seconds. :param poll_interval: Time to wait between each poll, in seconds. """ client = OPEN_AI_CLIENT start_time = time.time() while time.time() - start_time < timeout: run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id) if run.status in ["completed", "cancelled", "failed"]: print(f"Run completed with status: {run.status}") break elif run.status == "requires_action": print("Run requires action. Performing required action...") # Here, you would perform the required action, e.g., running functions # and then submitting the outputs. This is simplified for this example. # After performing the required action, you'd complete the action: # OPEN_AI_CLIENT.beta.threads.runs.complete_required_action(...) elif run.status == "expired": print("Run expired. Exiting...") break else: print(f"Run status is {run.status}. Waiting for updates...") time.sleep(poll_interval) else: print("Timeout reached. Run did not complete in the expected time.") # Once the Run is completed, handle the result accordingly if run.status == "completed": # Retrieve and handle messages or run steps as needed messages = client.beta.threads.messages.list(thread_id=thread_id) for message in messages.data: if message.role == "assistant": print(f"Assistant response: {message.content}") elif run.status in ["cancelled", "failed"]: # Handle cancellation or failure print(f"Run ended with status: {run.status}") elif run.status == "expired": # Handle expired run print("Run expired without completion.") return run.status def update_slide(direction): global TRANSCRIPTS global CURRENT_INDEX print("=== 更新投影片 ===") print(f"CURRENT_INDEX: {CURRENT_INDEX}") # print(f"TRANSCRIPTS: {TRANSCRIPTS}") CURRENT_INDEX += direction if CURRENT_INDEX < 0: CURRENT_INDEX = 0 # 防止索引小于0 elif CURRENT_INDEX >= len(TRANSCRIPTS): CURRENT_INDEX = len(TRANSCRIPTS) - 1 # 防止索引超出范围 # 获取当前条目的文本和截图 URL current_transcript = TRANSCRIPTS[CURRENT_INDEX] slide_image = current_transcript["screenshot_path"] slide_text = current_transcript["text"] return slide_image, slide_text def prev_slide(): return update_slide(-1) def next_slide(): return update_slide(1) IS_PASSWORD_SHOW = True IS_YOUTUBE_LINK_SHOW = True IS_YOUTUBE_LINK_BTN_SHOW = True def init_params(text, request: gr.Request): if request: print("Request headers dictionary:", request.headers) print("IP address:", request.client.host) print("Query parameters:", dict(request.query_params)) # url = request.url print("Request URL:", request.url) # if youtube_link in query_params if "youtube_id" in request.query_params: youtube_id = request.query_params["youtube_id"] youtube_link = f"https://www.youtube.com/watch?v={youtube_id}" print(f"youtube_link: {youtube_link}") else: youtube_link = "" print("youtube_link not in query_params") origin = request.headers.get("origin", "") if "junyiacademy" in origin: password_text = "6161" global IS_PASSWORD_SHOW global IS_YOUTUBE_LINK_SHOW global IS_YOUTUBE_LINK_BTN_SHOW IS_PASSWORD_SHOW = False IS_YOUTUBE_LINK_SHOW = False IS_YOUTUBE_LINK_BTN_SHOW = False else: password_text = "" return password_text, youtube_link HEAD = """ """ with gr.Blocks() as demo: with gr.Row(): password = gr.Textbox(label="Password", type="password", elem_id="password_input", visible=IS_PASSWORD_SHOW) file_upload = gr.File(label="Upload your CSV or Word file", visible=False) youtube_link = gr.Textbox(label="Enter YouTube Link", elem_id="youtube_link_input", visible=IS_YOUTUBE_LINK_SHOW) video_id = gr.Textbox(label="video_id", visible=False) youtube_link_btn = gr.Button("Submit_YouTube_Link", elem_id="youtube_link_btn", visible=IS_YOUTUBE_LINK_BTN_SHOW) web_link = gr.Textbox(label="Enter Web Page Link", visible=False) with gr.Tab("學生版"): with gr.Row(): with gr.Column(scale=3): with gr.Tab("文章模式"): reading_passage = gr.Textbox(label="Reading Passage", lines=40) with gr.Tab("重點"): df_summarise = gr.Textbox(container=True, show_copy_button=True, lines=40) with gr.Tab("問題"): gr.Markdown("## 常用問題") btn_1 = gr.Button() btn_2 = gr.Button() btn_3 = gr.Button() gr.Markdown("## 重新生成問題") btn_create_question = gr.Button("Create Questions") with gr.Accordion("See Details", open=False): with gr.Tab("本文"): df_string_output = gr.Textbox(lines=40, label="Data Text") with gr.Tab("逐字稿"): simple_html_content = gr.HTML(label="Simple Transcript") with gr.Tab("圖文"): transcript_html = gr.HTML(label="YouTube Transcript and Video") with gr.Tab("投影片"): slide_image = gr.Image() slide_text = gr.Textbox() with gr.Row(): prev_button = gr.Button("Previous") next_button = gr.Button("Next") prev_button.click(fn=prev_slide, inputs=[], outputs=[slide_image, slide_text]) next_button.click(fn=next_slide, inputs=[], outputs=[slide_image, slide_text]) with gr.Tab("markdown"): gr.Markdown("## 請複製以下 markdown 並貼到你的心智圖工具中,建議使用:https://markmap.js.org/repl") mind_map = gr.Textbox(container=True, show_copy_button=True, lines=40, elem_id="mind_map_markdown") with gr.Tab("心智圖",elem_id="mind_map_tab"): mind_map_html = gr.HTML() with gr.Column(scale=2): with gr.Tab("OPENAI"): gr.Markdown("## OPEN AI 模式") chatbot = gr.Chatbot(avatar_images=["https://junyi-avatar.s3.ap-northeast-1.amazonaws.com/live/%20%20foxcat-star-18.png?v=20231113095823614", "https://junyitopicimg.s3.amazonaws.com/s4byy--icon.jpe?v=20200513013523726"], label="OPEN AI 模式") thread_id = gr.Textbox(label="thread_id", visible=False) socratic_mode_btn = gr.Checkbox(label="蘇格拉底家教助理模式", value=True) openai_chatbot_audio_input = gr.Audio(sources=["microphone"], type="filepath") msg = gr.Textbox(label="Message") send_button = gr.Button("Send") with gr.Tab("GROQ"): gr.Markdown("## GROQ 模式") groq_chatbot = gr.Chatbot(label="groq mode chatbot") groq_msg = gr.Textbox(label="Message") groq_send_button = gr.Button("Send") with gr.Tab("JUTOR"): gr.Markdown("## JUTOR API 模式") jutor_chatbot = gr.Chatbot(label="jutor mode chatbot") jutor_msg = gr.Textbox(label="Message") jutor_send_button = gr.Button("Send") with gr.Tab("教師版"): with gr.Row(): content_topic = gr.Dropdown(label="選擇主題", choices=["數學", "自然", "國文", "英文", "社會"], value="數學") content_grade = gr.Dropdown(label="選擇年級", choices=["一年級", "二年級", "三年級", "四年級", "五年級", "六年級", "七年級", "八年級", "九年級", "十年級", "十一年級", "十二年級"], value="三年級") content_level = gr.Dropdown(label="差異化教學", choices=["基礎", "中級", "進階"], value="基礎") with gr.Row(): with gr.Column(scale=1): # with gr.Tab("認知階層評量題目"): # cognitive_level_content = gr.Textbox(label="輸入學習目標與內容") # cognitive_level_content_btn = gr.Button("生成評量題目") with gr.Tab("學習單"): worksheet_content_type_name = gr.Textbox(value="worksheet", visible=False) worksheet_algorithm = gr.Dropdown(label="選擇教學策略或理論", choices=["Bloom認知階層理論", "Polya數學解題法", "CRA教學法"], value="Bloom認知階層理論") worksheet_content_btn = gr.Button("生成學習單 📄") with gr.Accordion("prompt", open=False): worksheet_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) with gr.Tab("課程計畫"): lesson_plan_content_type_name = gr.Textbox(value="lesson_plan", visible=False) lesson_plan_time = gr.Slider(label="選擇課程時間(分鐘)", minimum=10, maximum=120, step=5, value=40) lesson_plan_btn = gr.Button("生成課程計畫 📕") with gr.Accordion("prompt", open=False): lesson_plan_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) with gr.Tab("出場券"): exit_ticket_content_type_name = gr.Textbox(value="exit_ticket", visible=False) exit_ticket_time = gr.Slider(label="選擇出場券時間(分鐘)", minimum=5, maximum=10, step=1, value=8) exit_ticket_btn = gr.Button("生成出場券 🎟️") with gr.Accordion("prompt", open=False): exit_ticket_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40) with gr.Tab("素養導向閱讀題組"): literacy_oriented_reading_content = gr.Textbox(label="輸入閱讀材料") literacy_oriented_reading_content_btn = gr.Button("生成閱讀理解題") # with gr.Tab("自我評估"): # self_assessment_content = gr.Textbox(label="輸入自評問卷或檢查表") # self_assessment_content_btn = gr.Button("生成自評問卷") # with gr.Tab("自我反思評量"): # self_reflection_content = gr.Textbox(label="輸入自我反思活動") # self_reflection_content_btn = gr.Button("生成自我反思活動") # with gr.Tab("後設認知"): # metacognition_content = gr.Textbox(label="輸入後設認知相關問題") # metacognition_content_btn = gr.Button("生成後設認知問題") with gr.Column(scale=2): # 生成對應不同模式的結果 exam_result_prompt = gr.Textbox(visible=False) exam_result = gr.Textbox(label="初次生成結果", show_copy_button=True) exam_result_fine_tune_prompt = gr.Textbox(label="根據結果,輸入你想更改的想法") exam_result_fine_tune_btn = gr.Button("微調結果") exam_result_fine_result = gr.Textbox(label="微調結果",show_copy_button=True) # 傳統模式 # send_button.click( # respond, # inputs=[msg, df_string_output, chatbot, socratic_mode_btn], # outputs=[msg, chatbot] # ) # # 连接按钮点击事件 # btn_1.click(respond, inputs=[btn_1, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot]) # btn_2.click(respond, inputs=[btn_2, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot]) # btn_3.click(respond, inputs=[btn_3, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot]) # chat_with_youtube_transcript # send_button.click( # chat_with_youtube_transcript, # inputs=[password, video_id, thread_id, df_string_output, msg, chatbot, socratic_mode_btn], # outputs=[msg, chatbot, thread_id] # ) # chat_with_youtube_transcript # OPENAI 模式 send_button.click( chat_with_youtube_transcript, inputs=[password, video_id, thread_id, df_string_output, msg, chatbot, socratic_mode_btn], outputs=[msg, chatbot, thread_id] ) openai_chatbot_audio_input.change( process_open_ai_audio_to_chatbot, inputs=[password, openai_chatbot_audio_input], outputs=[msg] ) # GROQ 模式 groq_send_button.click( chat_with_groq, inputs=[password, groq_msg, df_string_output, groq_chatbot, socratic_mode_btn], outputs=[groq_msg, groq_chatbot] ) # JUTOR API 模式 jutor_send_button.click( respond_with_jutor_chat, inputs=[password, jutor_msg, df_string_output, jutor_chatbot, socratic_mode_btn], outputs=[jutor_msg, jutor_chatbot] ) # 连接按钮点击事件 btn_1.click( chat_with_youtube_transcript, inputs=[password, video_id, thread_id, df_string_output, btn_1, chatbot, socratic_mode_btn], outputs=[msg, chatbot, thread_id] ) btn_2.click( chat_with_youtube_transcript, inputs=[password, video_id, thread_id, df_string_output, btn_2, chatbot, socratic_mode_btn], outputs=[msg, chatbot, thread_id] ) btn_3.click( chat_with_youtube_transcript, inputs=[password, video_id, thread_id, df_string_output, btn_3, chatbot, socratic_mode_btn], outputs=[msg, chatbot, thread_id] ) btn_create_question.click(change_questions, inputs = [password, df_string_output], outputs = [btn_1, btn_2, btn_3]) # file_upload.change(process_file, inputs=file_upload, outputs=df_string_output) file_upload.change(process_file, inputs=file_upload, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) # 当输入 YouTube 链接时触发 youtube_link.change( process_youtube_link, inputs=[password,youtube_link], outputs=[ video_id, btn_1, btn_2, btn_3, df_string_output, df_summarise, mind_map, mind_map_html, transcript_html, simple_html_content, slide_image, slide_text, reading_passage ] ) youtube_link_btn.click( process_youtube_link, inputs=[password, youtube_link], outputs=[ video_id, btn_1, btn_2, btn_3, df_string_output, df_summarise, mind_map, mind_map_html, transcript_html, simple_html_content, slide_image, slide_text, reading_passage ] ) # 当输入网页链接时触发 # web_link.change(process_web_link, inputs=web_link, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) # 教師版 學習單 worksheet_content_btn.click( on_generate_ai_content, inputs=[password, df_string_output, content_topic, content_grade, content_level, worksheet_algorithm, worksheet_content_type_name], outputs=[exam_result, worksheet_prompt, exam_result_prompt] ) lesson_plan_btn.click( on_generate_ai_content, inputs=[password, df_string_output, content_topic, content_grade, content_level, lesson_plan_time, lesson_plan_content_type_name], outputs=[exam_result, lesson_plan_prompt, exam_result_prompt] ) exit_ticket_btn.click( on_generate_ai_content, inputs=[password, df_string_output, content_topic, content_grade, content_level, exit_ticket_time, exit_ticket_content_type_name], outputs=[exam_result, exit_ticket_prompt, exam_result_prompt] ) # 生成結果微調 exam_result_fine_tune_btn.click( generate_exam_fine_tune_result, inputs=[password, exam_result_prompt, df_string_output, exam_result, exam_result_fine_tune_prompt], outputs=[exam_result_fine_result] ) demo.load( init_params, inputs =[youtube_link], outputs = [password , youtube_link] ) demo.launch(allowed_paths=["videos"])