import gradio as gr
import pandas as pd
import requests
from bs4 import BeautifulSoup
from docx import Document
import os
from openai import OpenAI
from groq import Groq
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import NoTranscriptFound
import yt_dlp
from moviepy.editor import VideoFileClip
from pytube import YouTube
import os
import io
import time
import json
from urllib.parse import urlparse, parse_qs
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.http import MediaIoBaseUpload
from educational_material import EducationalMaterial
from storage_service import GoogleCloudStorage
is_env_local = os.getenv("IS_ENV_LOCAL", "false") == "true"
print(f"is_env_local: {is_env_local}")
if is_env_local:
with open("local_config.json") as f:
config = json.load(f)
PASSWORD = config["PASSWORD"]
GCS_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"])
DRIVE_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"])
OPEN_AI_KEY = config["OPEN_AI_KEY"]
GROQ_API_KEY = config["GROQ_API_KEY"]
JUTOR_CHAT_KEY = config["JUTOR_CHAT_KEY"]
OUTPUT_PATH = config["OUTPUT_PATH"]
else:
PASSWORD = os.getenv("PASSWORD")
GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
DRIVE_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
JUTOR_CHAT_KEY = os.getenv("JUTOR_CHAT_KEY")
OUTPUT_PATH = 'videos'
TRANSCRIPTS = []
CURRENT_INDEX = 0
VIDEO_ID = ""
OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY)
GROQ_CLIENT = Groq(api_key=GROQ_API_KEY)
GCS_SERVICE = GoogleCloudStorage(GCS_KEY)
GCS_CLIENT = GCS_SERVICE.client
# 驗證 password
def verify_password(password):
if password == PASSWORD:
return True
else:
raise gr.Error("密碼錯誤")
# ====gcs====
def gcs_check_file_exists(gcs_client, bucket_name, file_name):
"""
检查 GCS 存储桶中是否存在指定的文件
file_name 格式:{folder_name}/{file_name}
"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(file_name)
return blob.exists()
def upload_file_to_gcs(gcs_client, bucket_name, destination_blob_name, file_path):
"""上传文件到指定的 GCS 存储桶"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
print(f"File {file_path} uploaded to {destination_blob_name} in GCS.")
def upload_file_to_gcs_with_json_string(gcs_client, bucket_name, destination_blob_name, json_string):
"""上传字符串到指定的 GCS 存储桶"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(json_string)
print(f"JSON string uploaded to {destination_blob_name} in GCS.")
def download_blob_to_string(gcs_client, bucket_name, source_blob_name):
"""从 GCS 下载文件内容到字符串"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
return blob.download_as_text()
def make_blob_public(gcs_client, bucket_name, blob_name):
"""将指定的 GCS 对象设置为公共可读"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.make_public()
print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}")
def get_blob_public_url(gcs_client, bucket_name, blob_name):
"""获取指定 GCS 对象的公开 URL"""
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
return blob.public_url
def upload_img_and_get_public_url(gcs_client, bucket_name, file_name, file_path):
"""上传图片到 GCS 并获取其公开 URL"""
# 上传图片
upload_file_to_gcs(gcs_client, bucket_name, file_name, file_path)
# 将上传的图片设置为公开
make_blob_public(gcs_client, bucket_name, file_name)
# 获取图片的公开 URL
public_url = get_blob_public_url(gcs_client, bucket_name, file_name)
print(f"Public URL for the uploaded image: {public_url}")
return public_url
def copy_all_files_from_drive_to_gcs(drive_service, gcs_client, drive_folder_id, bucket_name, gcs_folder_name):
# Get all files from the folder
query = f"'{drive_folder_id}' in parents and trashed = false"
response = drive_service.files().list(q=query).execute()
files = response.get('files', [])
for file in files:
# Copy each file to GCS
file_id = file['id']
file_name = file['name']
gcs_destination_path = f"{gcs_folder_name}/{file_name}"
copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path)
def copy_file_from_drive_to_gcs(drive_service, gcs_client, file_id, bucket_name, gcs_destination_path):
# Download file content from Drive
request = drive_service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
fh.seek(0)
file_content = fh.getvalue()
# Upload file content to GCS
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(gcs_destination_path)
blob.upload_from_string(file_content)
print(f"File {file_id} copied to GCS at {gcs_destination_path}.")
# # ====drive====初始化
def init_drive_service():
credentials_json_string = DRIVE_KEY
credentials_dict = json.loads(credentials_json_string)
SCOPES = ['https://www.googleapis.com/auth/drive']
credentials = service_account.Credentials.from_service_account_info(
credentials_dict, scopes=SCOPES)
service = build('drive', 'v3', credentials=credentials)
return service
def create_folder_if_not_exists(service, folder_name, parent_id):
print("检查是否存在特定名称的文件夹,如果不存在则创建")
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false"
response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute()
folders = response.get('files', [])
if not folders:
# 文件夹不存在,创建新文件夹
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id]
}
folder = service.files().create(body=file_metadata, fields='id').execute()
return folder.get('id')
else:
# 文件夹已存在
return folders[0]['id']
# 检查Google Drive上是否存在文件
def check_file_exists(service, folder_name, file_name):
query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false"
response = service.files().list(q=query).execute()
files = response.get('files', [])
return len(files) > 0, files[0]['id'] if files else None
def upload_content_directly(service, file_name, folder_id, content):
"""
直接将内容上传到Google Drive中的新文件。
"""
if not file_name:
raise ValueError("文件名不能为空")
if not folder_id:
raise ValueError("文件夹ID不能为空")
if content is None: # 允许空字符串上传,但不允许None
raise ValueError("内容不能为空")
file_metadata = {'name': file_name, 'parents': [folder_id]}
# 使用io.BytesIO为文本内容创建一个内存中的文件对象
try:
with io.BytesIO(content.encode('utf-8')) as fh:
media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True)
print("==content==")
print(content)
print("==content==")
print("==media==")
print(media)
print("==media==")
# 执行上传
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
return file.get('id')
except Exception as e:
print(f"上传文件时发生错误: {e}")
raise # 重新抛出异常,调用者可以根据需要处理或忽略
def upload_file_directly(service, file_name, folder_id, file_path):
# 上傳 .json to Google Drive
file_metadata = {'name': file_name, 'parents': [folder_id]}
media = MediaFileUpload(file_path, mimetype='application/json')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
# return file.get('id') # 返回文件ID
return True
def upload_img_directly(service, file_name, folder_id, file_path):
file_metadata = {'name': file_name, 'parents': [folder_id]}
media = MediaFileUpload(file_path, mimetype='image/jpeg')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
return file.get('id') # 返回文件ID
def download_file_as_string(service, file_id):
"""
从Google Drive下载文件并将其作为字符串返回。
"""
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
fh.seek(0)
content = fh.read().decode('utf-8')
return content
def set_public_permission(service, file_id):
service.permissions().create(
fileId=file_id,
body={"type": "anyone", "role": "reader"},
fields='id',
).execute()
def update_file_on_drive(service, file_id, file_content):
"""
更新Google Drive上的文件内容。
参数:
- service: Google Drive API服务实例。
- file_id: 要更新的文件的ID。
- file_content: 新的文件内容,字符串格式。
"""
# 将新的文件内容转换为字节流
fh = io.BytesIO(file_content.encode('utf-8'))
media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True)
# 更新文件
updated_file = service.files().update(
fileId=file_id,
media_body=media
).execute()
print(f"文件已更新,文件ID: {updated_file['id']}")
# ---- Text file ----
def process_file(password, file):
verify_password(password)
# 读取文件
if file.name.endswith('.csv'):
df = pd.read_csv(file)
text = df_to_text(df)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file)
text = df_to_text(df)
elif file.name.endswith('.docx'):
text = docx_to_text(file)
else:
raise ValueError("Unsupported file type")
df_string = df.to_string()
# 宜蘭:移除@XX@符号 to |
df_string = df_string.replace("@XX@", "|")
# 根据上传的文件内容生成问题
questions = generate_questions(df_string)
summary = generate_summarise(df_string)
# 返回按钮文本和 DataFrame 字符串
return questions[0] if len(questions) > 0 else "", \
questions[1] if len(questions) > 1 else "", \
questions[2] if len(questions) > 2 else "", \
summary, \
df_string
def df_to_text(df):
# 将 DataFrame 转换为纯文本
return df.to_string()
def docx_to_text(file):
# 将 Word 文档转换为纯文本
doc = Document(file)
return "\n".join([para.text for para in doc.paragraphs])
# ---- YouTube link ----
def format_seconds_to_time(seconds):
"""将秒数格式化为 时:分:秒 的形式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def extract_youtube_id(url):
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc:
# 对于标准链接,视频ID在查询参数'v'中
query_params = parse_qs(parsed_url.query)
return query_params.get("v")[0] if "v" in query_params else None
elif "youtu.be" in parsed_url.netloc:
# 对于短链接,视频ID是路径的一部分
return parsed_url.path.lstrip('/')
else:
return None
def get_transcript(video_id):
languages = ['zh-TW', 'zh-Hant', 'zh', 'en'] # 優先順序列表
for language in languages:
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language])
return transcript # 成功獲取字幕,直接返回結果
except NoTranscriptFound:
continue # 當前語言的字幕沒有找到,繼續嘗試下一個語言
return None # 所有嘗試都失敗,返回None
def generate_transcription(video_id):
youtube_url = f'https://www.youtube.com/watch?v={video_id}'
codec_name = "mp3"
outtmpl = f"{OUTPUT_PATH}/{video_id}.%(ext)s"
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': codec_name,
'preferredquality': '192'
}],
'outtmpl': outtmpl,
}
print("===download video mp3===")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
audio_path = f"{OUTPUT_PATH}/{video_id}.{codec_name}"
print("===transcription by open ai===")
with open(audio_path, "rb") as audio_file:
srt_content = OPEN_AI_CLIENT.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["segment"],
prompt="如果逐字稿有中文,請使用繁體中文 zh-TW",
)
# get segments
segments = srt_content.segments
transcription = [
{
"text": item["text"],
"start": int(item["start"]),
"duration": int(item["end"] - item["start"])
}
for item in segments
]
return transcription
def process_transcript_and_screenshots(video_id):
print("====process_transcript_and_screenshots====")
# Drive
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
# 逐字稿文件名
file_name = f'{video_id}_transcript.json'
# 检查逐字稿是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
# 从YouTube获取逐字稿并上传
transcript = get_transcript(video_id)
if transcript:
print("成功獲取字幕")
else:
print("沒有找到字幕")
transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
file_id = upload_content_directly(service, file_name, folder_id, transcript_text)
print("逐字稿已上传到Google Drive")
else:
# 逐字稿已存在,下载逐字稿内容
print("逐字稿已存在于Google Drive中")
transcript_text = download_file_as_string(service, file_id)
transcript = json.loads(transcript_text)
# 处理逐字稿中的每个条目,检查并上传截图
for entry in transcript:
if 'img_file_id' not in entry:
screenshot_path = screenshot_youtube_video(video_id, entry['start'])
img_file_id = upload_img_directly(service, f"{video_id}_{entry['start']}.jpg", folder_id, screenshot_path)
set_public_permission(service, img_file_id)
entry['img_file_id'] = img_file_id
print(f"截图已上传到Google Drive: {img_file_id}")
# 更新逐字稿文件
updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
update_file_on_drive(service, file_id, updated_transcript_text)
print("逐字稿已更新,包括截图链接")
return transcript
def process_transcript_and_screenshots_on_gcs(video_id):
print("====process_transcript_and_screenshots_on_gcs====")
# GCS
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
# 逐字稿文件名
transcript_file_name = f'{video_id}_transcript.json'
transcript_blob_name = f"{video_id}/{transcript_file_name}"
# 检查逐字稿是否存在
is_transcript_exists = GCS_SERVICE.check_file_exists(bucket_name, transcript_blob_name)
if not is_transcript_exists:
# 从YouTube获取逐字稿并上传
try:
transcript = get_transcript(video_id)
except:
# call open ai whisper
print("===call open ai whisper===")
transcript = generate_transcription(video_id)
if transcript:
print("成功獲取字幕")
else:
print("沒有找到字幕")
transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, transcript_text)
else:
# 逐字稿已存在,下载逐字稿内容
print("逐字稿已存在于GCS中")
transcript_text = download_blob_to_string(gcs_client, bucket_name, transcript_blob_name)
transcript = json.loads(transcript_text)
# print("===確認其他衍生文件===")
# source = "gcs"
# get_questions(video_id, transcript_text, source)
# get_video_id_summary(video_id, transcript_text, source)
# get_mind_map(video_id, transcript_text, source)
# print("===確認其他衍生文件 end ===")
# 處理截圖
for entry in transcript:
if 'img_file_id' not in entry:
# 檢查 OUTPUT_PATH 是否存在 video_id.mp4
video_path = f'{OUTPUT_PATH}/{video_id}.mp4'
if not os.path.exists(video_path):
# try 5 times 如果都失敗就 raise
for i in range(5):
try:
download_youtube_video(video_id)
break
except Exception as e:
if i == 4:
raise gr.Error(f"下载视频失败: {str(e)}")
time.sleep(5)
# 截图
screenshot_path = screenshot_youtube_video(video_id, entry['start'])
screenshot_blob_name = f"{video_id}/{video_id}_{entry['start']}.jpg"
img_file_id = upload_img_and_get_public_url(gcs_client, bucket_name, screenshot_blob_name, screenshot_path)
entry['img_file_id'] = img_file_id
print(f"截图已上传到GCS: {img_file_id}")
# 更新逐字稿文件
print("===更新逐字稿文件===")
print(transcript)
print("===更新逐字稿文件===")
updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, transcript_blob_name, updated_transcript_text)
print("逐字稿已更新,包括截图链接")
updated_transcript_json = json.loads(updated_transcript_text)
return updated_transcript_json
def process_youtube_link(password, link):
verify_password(password)
# 使用 YouTube API 获取逐字稿
# 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中
video_id = extract_youtube_id(link)
global VIDEO_ID
VIDEO_ID = video_id
try:
# transcript = process_transcript_and_screenshots(video_id)
transcript = process_transcript_and_screenshots_on_gcs(video_id)
except Exception as e:
error_msg = f" {video_id} 逐字稿錯誤: {str(e)}"
print("===process_youtube_link error===")
print(error_msg)
raise gr.Error(error_msg)
formatted_transcript = []
formatted_simple_transcript =[]
screenshot_paths = []
for entry in transcript:
start_time = format_seconds_to_time(entry['start'])
end_time = format_seconds_to_time(entry['start'] + entry['duration'])
embed_url = get_embedded_youtube_link(video_id, entry['start'])
img_file_id = entry['img_file_id']
# img_file_id =""
# 先取消 Google Drive 的图片
# screenshot_path = f"https://lh3.googleusercontent.com/d/{img_file_id}=s4000"
screenshot_path = img_file_id
line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text'],
"embed_url": embed_url,
"screenshot_path": screenshot_path
}
formatted_transcript.append(line)
# formatted_simple_transcript 只要 start_time, end_time, text
simple_line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text']
}
formatted_simple_transcript.append(simple_line)
screenshot_paths.append(screenshot_path)
global TRANSCRIPTS
TRANSCRIPTS = formatted_transcript
# 基于逐字稿生成其他所需的输出
source = "gcs"
questions = get_questions(video_id, formatted_simple_transcript, source)
formatted_transcript_json = json.dumps(formatted_transcript, ensure_ascii=False, indent=2)
summary_json = get_video_id_summary(video_id, formatted_simple_transcript, source)
summary = summary_json["summary"]
html_content = format_transcript_to_html(formatted_transcript)
simple_html_content = format_simple_transcript_to_html(formatted_simple_transcript)
first_image = formatted_transcript[0]['screenshot_path']
# first_image = "https://www.nameslook.com/names/dfsadf-nameslook.png"
first_text = formatted_transcript[0]['text']
mind_map_json = get_mind_map(video_id, formatted_simple_transcript, source)
mind_map = mind_map_json["mind_map"]
mind_map_html = get_mind_map_html(mind_map)
reading_passage_json = get_reading_passage(video_id, formatted_simple_transcript, source)
reading_passage = reading_passage_json["reading_passage"]
# 确保返回与 UI 组件预期匹配的输出
return video_id, \
questions[0] if len(questions) > 0 else "", \
questions[1] if len(questions) > 1 else "", \
questions[2] if len(questions) > 2 else "", \
formatted_transcript_json, \
summary, \
mind_map, \
mind_map_html, \
html_content, \
simple_html_content, \
first_image, \
first_text, \
reading_passage
def format_transcript_to_html(formatted_transcript):
html_content = ""
for entry in formatted_transcript:
html_content += f"
{entry['start_time']} - {entry['end_time']}
"
html_content += f"{entry['text']}
"
html_content += f"
"
return html_content
def format_simple_transcript_to_html(formatted_transcript):
html_content = ""
for entry in formatted_transcript:
html_content += f"{entry['start_time']} - {entry['end_time']}
"
html_content += f"{entry['text']}
"
return html_content
def get_embedded_youtube_link(video_id, start_time):
int_start_time = int(start_time)
embed_url = f"https://www.youtube.com/embed/{video_id}?start={int_start_time}&autoplay=1"
return embed_url
def download_youtube_video(youtube_id, output_path=OUTPUT_PATH):
# Construct the full YouTube URL
youtube_url = f'https://www.youtube.com/watch?v={youtube_id}'
# Create the output directory if it doesn't exist
if not os.path.exists(output_path):
os.makedirs(output_path)
# Download the video
yt = YouTube(youtube_url)
video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
video_stream.download(output_path=output_path, filename=youtube_id+".mp4")
print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4")
def screenshot_youtube_video(youtube_id, snapshot_sec):
video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4'
file_name = f"{youtube_id}_{snapshot_sec}.jpg"
with VideoFileClip(video_path) as video:
screenshot_path = f'{OUTPUT_PATH}/{file_name}'
video.save_frame(screenshot_path, snapshot_sec)
return screenshot_path
# ---- Web ----
def process_web_link(link):
# 抓取和解析网页内容
response = requests.get(link)
soup = BeautifulSoup(response.content, 'html.parser')
return soup.get_text()
# ---- LLM Generator ----
def get_reading_passage(video_id, df_string, source):
if source == "gcs":
print("===get_reading_passage on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_reading_passage.json'
blob_name = f"{video_id}/{file_name}"
# 检查 reading_passage 是否存在
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
reading_passage = generate_reading_passage(df_string)
reading_passage_json = {"reading_passage": str(reading_passage)}
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, reading_passage_text)
print("reading_passage已上传到GCS")
else:
# reading_passage已存在,下载内容
print("reading_passage已存在于GCS中")
reading_passage_text = download_blob_to_string(gcs_client, bucket_name, blob_name)
reading_passage_json = json.loads(reading_passage_text)
elif source == "drive":
print("===get_reading_passage on drive===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_reading_passage.json'
# 检查 reading_passage 是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
reading_passage = generate_reading_passage(df_string)
reading_passage_json = {"reading_passage": str(reading_passage)}
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, reading_passage_text)
print("reading_passage已上傳到Google Drive")
else:
# reading_passage已存在,下载内容
print("reading_passage已存在于Google Drive中")
reading_passage_text = download_file_as_string(service, file_id)
return reading_passage_json
def generate_reading_passage(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW"
user_content = f"""
請根據 {df_string}
文本自行判斷資料的種類
幫我組合成 Reading Passage
並潤稿讓文句通順
請一定要使用繁體中文 zh-TW,並用台灣人的口語
產生的結果不要前後文解釋,只需要專注提供 Reading Passage
"""
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000,
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
reading_passage = response.choices[0].message.content.strip()
print("=====reading_passage=====")
print(reading_passage)
print("=====reading_passage=====")
return reading_passage
def get_mind_map(video_id, df_string, source):
if source == "gcs":
print("===get_mind_map on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_mind_map.json'
blob_name = f"{video_id}/{file_name}"
# 检查檔案是否存在
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
mind_map = generate_mind_map(df_string)
mind_map_json = {"mind_map": str(mind_map)}
mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, mind_map_text)
print("mind_map已上傳到GCS")
else:
# mindmap已存在,下载内容
print("mind_map已存在于GCS中")
mind_map_text = download_blob_to_string(gcs_client, bucket_name, blob_name)
mind_map_json = json.loads(mind_map_text)
elif source == "drive":
print("===get_mind_map on drive===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_mind_map.json'
# 检查檔案是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
mind_map = generate_mind_map(df_string)
mind_map_json = {"mind_map": str(mind_map)}
mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, mind_map_text)
print("mind_map已上傳到Google Drive")
else:
# mindmap已存在,下载内容
print("mind_map已存在于Google Drive中")
mind_map_text = download_file_as_string(service, file_id)
mind_map_json = json.loads(mind_map_text)
return mind_map_json
def generate_mind_map(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW"
user_content = f"""
請根據 {df_string} 文本建立 markdown 心智圖
注意:不需要前後文敘述,直接給出 markdown 文本即可
這對我很重要
"""
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000,
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
mind_map = response.choices[0].message.content.strip()
print("=====mind_map=====")
print(mind_map)
print("=====mind_map=====")
return mind_map
def get_mind_map_html(mind_map):
mind_map_markdown = mind_map.replace("```markdown", "").replace("```", "")
mind_map_html = f"""
"""
return mind_map_html
def get_video_id_summary(video_id, df_string, source):
if source == "gcs":
print("===get_video_id_summary on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_summary.json'
summary_file_blob_name = f"{video_id}/{file_name}"
# 检查 summary_file 是否存在
is_summary_file_exists = GCS_SERVICE.check_file_exists(bucket_name, summary_file_blob_name)
if not is_summary_file_exists:
summary = generate_summarise(df_string)
summary_json = {"summary": str(summary)}
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, summary_file_blob_name, summary_text)
print("summary已上传到GCS")
else:
# summary已存在,下载内容
print("summary已存在于GCS中")
summary_text = download_blob_to_string(gcs_client, bucket_name, summary_file_blob_name)
summary_json = json.loads(summary_text)
elif source == "drive":
print("===get_video_id_summary===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_summary.json'
# 检查逐字稿是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
summary = generate_summarise(df_string)
summary_json = {"summary": str(summary)}
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
try:
upload_content_directly(service, file_name, folder_id, summary_text)
print("summary已上傳到Google Drive")
except Exception as e:
error_msg = f" {video_id} 摘要錯誤: {str(e)}"
print("===get_video_id_summary error===")
print(error_msg)
print("===get_video_id_summary error===")
else:
# 逐字稿已存在,下载逐字稿内容
print("summary已存在Google Drive中")
summary_text = download_file_as_string(service, file_id)
summary_json = json.loads(summary_text)
return summary_json
def generate_summarise(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW"
user_content = f"""
請根據 {df_string},判斷這份文本
如果是資料類型,請提估欄位敘述、資料樣態與資料分析,告訴學生這張表的意義,以及可能的結論與對應方式
如果是影片類型,請提估影片內容,告訴學生這部影片的意義,
整體摘要在一百字以內
小範圍切出不同段落的相對應時間軸的重點摘要,最多不超過五段
注意不要遺漏任何一段時間軸的內容
格式為 【start - end】: 摘要
以及可能的結論與結尾延伸小問題提供學生作反思
整體格式為:
🗂️ 1. 內容類型:?
📚 2. 整體摘要
🔖 3. 重點概念
🔑 4. 關鍵時刻
💡 5. 為什麼我們要學這個?
❓ 6. 延伸小問題
"""
# 🗂️ 1. 內容類型:?
# 📚 2. 整體摘要
# 🔖 3. 條列式重點
# 🔑 4. 關鍵時刻(段落摘要)
# 💡 5. 結論反思(為什麼我們要學這個?)
# ❓ 6. 延伸小問題
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": "gpt-4-turbo-preview",
"messages": messages,
"max_tokens": 4000,
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
df_summarise = response.choices[0].message.content.strip()
print("=====df_summarise=====")
print(df_summarise)
print("=====df_summarise=====")
return df_summarise
def generate_questions(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,並用既有資料為本質猜測用戶可能會問的問題,使用 zh-TW"
user_content = f"請根據 {df_string} 生成三個問題,並用 JSON 格式返回 questions:[q1的敘述text, q2的敘述text, q3的敘述text]"
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
response_format = { "type": "json_object" }
print("=====messages=====")
print(messages)
print("=====messages=====")
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000,
"response_format": response_format
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
questions = json.loads(response.choices[0].message.content)["questions"]
print("=====json_response=====")
print(questions)
print("=====json_response=====")
return questions
def get_questions(video_id, df_string, source="gcs"):
if source == "gcs":
# 去 gcs 確認是有有 video_id_questions.json
print("===get_questions on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_questions.json'
blob_name = f"{video_id}/{file_name}"
# 检查檔案是否存在
is_questions_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_questions_exists:
questions = generate_questions(df_string)
questions_text = json.dumps(questions, ensure_ascii=False, indent=2)
upload_file_to_gcs_with_json_string(gcs_client, bucket_name, blob_name, questions_text)
print("questions已上傳到GCS")
else:
# 逐字稿已存在,下载逐字稿内容
print("questions已存在于GCS中")
questions_text = download_blob_to_string(gcs_client, bucket_name, blob_name)
questions = json.loads(questions_text)
elif source == "drive":
# 去 g drive 確認是有有 video_id_questions.json
print("===get_questions===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_questions.json'
# 检查檔案是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
questions = generate_questions(df_string)
questions_text = json.dumps(questions, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, questions_text)
print("questions已上傳到Google Drive")
else:
# 逐字稿已存在,下载逐字稿内容
print("questions已存在于Google Drive中")
questions_text = download_file_as_string(service, file_id)
questions = json.loads(questions_text)
q1 = questions[0] if len(questions) > 0 else ""
q2 = questions[1] if len(questions) > 1 else ""
q3 = questions[2] if len(questions) > 2 else ""
print("=====get_questions=====")
print(f"q1: {q1}")
print(f"q2: {q2}")
print(f"q3: {q3}")
print("=====get_questions=====")
return q1, q2, q3
def change_questions(password, df_string):
verify_password(password)
questions = generate_questions(df_string)
q1 = questions[0] if len(questions) > 0 else ""
q2 = questions[1] if len(questions) > 1 else ""
q3 = questions[2] if len(questions) > 2 else ""
print("=====get_questions=====")
print(f"q1: {q1}")
print(f"q2: {q2}")
print(f"q3: {q3}")
print("=====get_questions=====")
return q1, q2, q3
# AI 生成教學素材
def on_generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type):
verify_password(password)
material = EducationalMaterial(df_string, topic, grade, level, specific_feature, content_type)
prompt = material.generate_content_prompt()
user_content = material.build_user_content()
messages = material.build_messages(user_content)
ai_model_name = "gpt-4-1106-preview"
request_payload = {
"model": ai_model_name,
"messages": messages,
"max_tokens": 4000 # 举例,实际上您可能需要更详细的配置
}
ai_content = material.send_ai_request(OPEN_AI_CLIENT, request_payload)
return ai_content, prompt, prompt
def generate_exam_fine_tune_result(password, exam_result_prompt , df_string_output, exam_result, exam_result_fine_tune_prompt):
verify_password(password)
material = EducationalMaterial(df_string_output, "", "", "", "", "")
user_content = material.build_fine_tune_user_content(exam_result_prompt, exam_result, exam_result_fine_tune_prompt)
messages = material.build_messages(user_content)
ai_model_name = "gpt-4-1106-preview"
request_payload = {
"model": ai_model_name,
"messages": messages,
"max_tokens": 4000 # 举例,实际上您可能需要更详细的配置
}
ai_content = material.send_ai_request(OPEN_AI_CLIENT, request_payload)
return ai_content
# ---- Chatbot ----
def respond(password, user_message, data, chat_history, socratic_mode=False):
verify_password(password)
print("=== 變數:user_message ===")
print(user_message)
print("=== 變數:chat_history ===")
print(chat_history)
data_json = json.loads(data)
for entry in data_json:
entry.pop('embed_url', None) # Remove 'embed_url' if it exists
entry.pop('screenshot_path', None)
if socratic_mode:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示
不要直接給予答案,讓學生自己思考
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
最後,在你回答的開頭標註【蘇格拉底助教】
"""
else:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
"""
messages = [
{"role": "system", "content": sys_content}
]
# if chat_history is not none, append role, content to messages
# chat_history = [(user, assistant), (user, assistant), ...]
# In the list, first one is user, then assistant
if chat_history is not None:
# 如果超過10則訊息,只保留最後10則訊息
if len(chat_history) > 10:
chat_history = chat_history[-10:]
for chat in chat_history:
old_messages = [
{"role": "user", "content": chat[0]},
{"role": "assistant", "content": chat[1]}
]
messages += old_messages
else:
pass
messages.append({"role": "user", "content": user_message})
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000 # 設定一個較大的值,可根據需要調整
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
response_text = response.choices[0].message.content.strip()
# 更新聊天历史
new_chat_history = (user_message, response_text)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
# 返回聊天历史和空字符串清空输入框
return "", chat_history
def respond_with_jutor_chat(password, user_message, data, chat_history, socratic_mode=False):
verify_password(password)
data_json = json.loads(data)
for entry in data_json:
entry.pop('embed_url', None) # Remove 'embed_url' if it exists
entry.pop('screenshot_path', None)
if socratic_mode:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示
不要直接給予答案,讓學生自己思考
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
最後,在你回答的開頭標註【蘇格拉底助教】
"""
else:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
"""
messages = [
{"role": "system", "content": sys_content}
]
# if chat_history is not none, append role, content to messages
# chat_history = [(user, assistant), (user, assistant), ...]
# In the list, first one is user, then assistant
if chat_history is not None:
# 如果超過10則訊息,只保留最後10則訊息
if len(chat_history) > 10:
chat_history = chat_history[-10:]
for chat in chat_history:
old_messages = [
{"role": "user", "content": chat[0]},
{"role": "assistant", "content": chat[1]}
]
messages += old_messages
else:
pass
messages.append({"role": "user", "content": user_message})
api_endpoint = "https://ci-live-feat-video-ai-dot-junyiacademy.appspot.com/api/v2/jutor/hf-chat"
headers = {
"Content-Type": "application/json",
"x-api-key": JUTOR_CHAT_KEY,
}
data = {
"data": {
"messages": messages,
"max_tokens": 512,
"temperature": 0.9,
"model": "gpt-4-1106-preview",
"stream": False,
}
}
response = requests.post(api_endpoint, headers=headers, data=json.dumps(data))
if response.status_code == 200:
# 处理响应数据
response_data = response.json()
prompt = response_data['data']['choices'][0]['message']['content'].strip()
# 更新聊天历史
new_chat_history = (user_message, prompt)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
# 返回聊天历史和空字符串清空输入框
return "", chat_history
else:
# 处理错误情况
print(f"Error: {response.status_code}")
return "请求失败,请稍后再试!", chat_history
def chat_with_groq(password, user_message, data, chat_history, socratic_mode=False):
verify_password(password)
print("=== 變數:user_message ===")
print(user_message)
print("=== 變數:chat_history ===")
print(chat_history)
data_json = json.loads(data)
for entry in data_json:
entry.pop('embed_url', None) # Remove 'embed_url' if it exists
entry.pop('screenshot_path', None)
if socratic_mode:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 台灣人的口與表達,及繁體中文zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示
不要直接給予答案,讓學生自己思考
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
最後,在你回答的開頭標註【蘇格拉底助教】
"""
else:
sys_content = f"""
你是一個擅長資料分析跟影片教學的老師,user 為學生
請用 {data} 為資料文本,自行判斷資料的種類,
並進行對話,使用 zh-TW
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生可以找到相對應的時間點
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
"""
messages = [
{"role": "system", "content": sys_content}
]
# if chat_history is not none, append role, content to messages
# chat_history = [(user, assistant), (user, assistant), ...]
# In the list, first one is user, then assistant
if chat_history is not None:
# 如果超過10則訊息,只保留最後10則訊息
if len(chat_history) > 10:
chat_history = chat_history[-10:]
for chat in chat_history:
old_messages = [
{"role": "user", "content": chat[0]},
{"role": "assistant", "content": chat[1]}
]
messages += old_messages
else:
pass
messages.append({"role": "user", "content": user_message})
request_payload = {
"model": "mixtral-8x7b-32768",
"messages": messages,
"max_tokens": 4000 # 設定一個較大的值,可根據需要調整
}
response = GROQ_CLIENT.chat.completions.create(**request_payload)
response_text = response.choices[0].message.content.strip()
# 更新聊天历史
new_chat_history = (user_message, response_text)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
# 返回聊天历史和空字符串清空输入框
return "", chat_history
def chat_with_youtube_transcript(password, youtube_id, thread_id, trascript, user_message, chat_history, socratic_mode=False):
verify_password(password)
# 先計算 user_message 是否超過 500 個字
if len(user_message) > 1500:
error_msg = "你的訊息太長了,請縮短訊息長度至五百字以內"
raise gr.Error(error_msg)
try:
assistant_id = "asst_kmvZLNkDUYaNkMNtZEAYxyPq"
client = OPEN_AI_CLIENT
# 從 file 拿逐字稿資料
# instructions = f"""
# 你是一個擅長資料分析跟影片教學的老師,user 為學生
# 請根據 assistant beta 的上傳資料
# 如果 file 內有找到 file.content["{youtube_id}"] 為資料文本,自行判斷資料的種類,
# 如果沒有資料,請告訴用戶沒有逐字稿資料,但仍然可以進行對話,使用台灣人的口與表達,及繁體中文 zh-TW
# 請嚴格執行,只根據 file.content["{youtube_id}"] 為資料文本,沒有就是沒有資料,不要引用其他資料
# 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
# socratic_mode = {socratic_mode}
# 如果 socratic_mode = True,
# - 請用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示
# - 不要直接給予答案,讓學生自己思考
# - 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案
# - 在你回答的開頭標註【蘇格拉底助教:{youtube_id} 】
# 如果 socratic_mode = False,
# - 直接回答學生問題
# - 在你回答的開頭標註【一般學習精靈:{youtube_id} 】
# 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
# 或者你可以反問學生一些問題,幫助學生更好的理解資料
# 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
# 最後只要是參考逐字稿資料,請在回答的最後標註【參考資料:(分):(秒)】
# """
# 直接安排逐字稿資料 in instructions
trascript_json = json.loads(trascript)
# 移除 embed_url, screenshot_path
for entry in trascript_json:
entry.pop('embed_url', None)
entry.pop('screenshot_path', None)
trascript_text = json.dumps(trascript_json, ensure_ascii=False, indent=2)
instructions = f"""
逐字稿資料:{trascript_text}
-------------------------------------
你是一個擅長資料分析跟影片教學的老師,user 為學生
如果是影片類型,不用解釋逐字稿格式,直接回答學生問題
socratic_mode = {socratic_mode}
如果 socratic_mode = True,
- 請用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示
- 不要直接給予答案,讓學生自己思考
- 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案
- 在你回答的開頭標註【蘇格拉底助教:{youtube_id} 】
如果 socratic_mode = False,
- 直接回答學生問題
- 在你回答的開頭標註【一般學習精靈:{youtube_id} 】
如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題
或者你可以反問學生一些問題,幫助學生更好的理解資料
如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題
最後只要是參考逐字稿資料,請在回答的最後標註【參考資料:(分):(秒)】
"""
# 创建线程
if not thread_id:
thread = client.beta.threads.create()
thread_id = thread.id
else:
thread = client.beta.threads.retrieve(thread_id)
# 向线程添加用户的消息
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_message
)
# 运行助手,生成响应
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant_id,
instructions=instructions,
)
# 等待助手响应,设定最大等待时间为 30 秒
run_status = poll_run_status(run.id, thread.id, timeout=30)
# 获取助手的响应消息
if run_status == "completed":
messages = client.beta.threads.messages.list(thread_id=thread.id)
# [MessageContentText(text=Text(annotations=[], value='您好!有什麼我可以幫助您的嗎?如果有任何問題或需要指導,請隨時告訴我!'), type='text')]
response_text = messages.data[0].content[0].text.value
else:
response_text = "學習精靈有點累,請稍後再試!"
# 更新聊天历史
new_chat_history = (user_message, response_text)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
except Exception as e:
print(f"Error: {e}")
raise gr.Error(f"Error: {e}")
# 返回聊天历史和空字符串清空输入框
return "", chat_history, thread.id
def process_open_ai_audio_to_chatbot(password, audio_url):
verify_password(password)
if audio_url:
with open(audio_url, "rb") as audio_file:
file_size = os.path.getsize(audio_url)
if file_size > 2000000:
raise gr.Error("檔案大小超過,請不要超過 60秒")
else:
response = OPEN_AI_CLIENT.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
# response 拆解 dict
print("=== response ===")
print(response)
print("=== response ===")
else:
response = ""
return response
def poll_run_status(run_id, thread_id, timeout=600, poll_interval=5):
"""
Polls the status of a Run and handles different statuses appropriately.
:param run_id: The ID of the Run to poll.
:param thread_id: The ID of the Thread associated with the Run.
:param timeout: Maximum time to wait for the Run to complete, in seconds.
:param poll_interval: Time to wait between each poll, in seconds.
"""
client = OPEN_AI_CLIENT
start_time = time.time()
while time.time() - start_time < timeout:
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status in ["completed", "cancelled", "failed"]:
print(f"Run completed with status: {run.status}")
break
elif run.status == "requires_action":
print("Run requires action. Performing required action...")
# Here, you would perform the required action, e.g., running functions
# and then submitting the outputs. This is simplified for this example.
# After performing the required action, you'd complete the action:
# OPEN_AI_CLIENT.beta.threads.runs.complete_required_action(...)
elif run.status == "expired":
print("Run expired. Exiting...")
break
else:
print(f"Run status is {run.status}. Waiting for updates...")
time.sleep(poll_interval)
else:
print("Timeout reached. Run did not complete in the expected time.")
# Once the Run is completed, handle the result accordingly
if run.status == "completed":
# Retrieve and handle messages or run steps as needed
messages = client.beta.threads.messages.list(thread_id=thread_id)
for message in messages.data:
if message.role == "assistant":
print(f"Assistant response: {message.content}")
elif run.status in ["cancelled", "failed"]:
# Handle cancellation or failure
print(f"Run ended with status: {run.status}")
elif run.status == "expired":
# Handle expired run
print("Run expired without completion.")
return run.status
def update_slide(direction):
global TRANSCRIPTS
global CURRENT_INDEX
print("=== 更新投影片 ===")
print(f"CURRENT_INDEX: {CURRENT_INDEX}")
# print(f"TRANSCRIPTS: {TRANSCRIPTS}")
CURRENT_INDEX += direction
if CURRENT_INDEX < 0:
CURRENT_INDEX = 0 # 防止索引小于0
elif CURRENT_INDEX >= len(TRANSCRIPTS):
CURRENT_INDEX = len(TRANSCRIPTS) - 1 # 防止索引超出范围
# 获取当前条目的文本和截图 URL
current_transcript = TRANSCRIPTS[CURRENT_INDEX]
slide_image = current_transcript["screenshot_path"]
slide_text = current_transcript["text"]
return slide_image, slide_text
def prev_slide():
return update_slide(-1)
def next_slide():
return update_slide(1)
IS_PASSWORD_SHOW = True
IS_YOUTUBE_LINK_SHOW = True
IS_YOUTUBE_LINK_BTN_SHOW = True
def init_params(text, request: gr.Request):
if request:
print("Request headers dictionary:", request.headers)
print("IP address:", request.client.host)
print("Query parameters:", dict(request.query_params))
# url = request.url
print("Request URL:", request.url)
# if youtube_link in query_params
if "youtube_id" in request.query_params:
youtube_id = request.query_params["youtube_id"]
youtube_link = f"https://www.youtube.com/watch?v={youtube_id}"
print(f"youtube_link: {youtube_link}")
else:
youtube_link = ""
print("youtube_link not in query_params")
origin = request.headers.get("origin", "")
if "junyiacademy" in origin:
password_text = "6161"
global IS_PASSWORD_SHOW
global IS_YOUTUBE_LINK_SHOW
global IS_YOUTUBE_LINK_BTN_SHOW
IS_PASSWORD_SHOW = False
IS_YOUTUBE_LINK_SHOW = False
IS_YOUTUBE_LINK_BTN_SHOW = False
else:
password_text = ""
return password_text, youtube_link
HEAD = """
"""
with gr.Blocks() as demo:
with gr.Row():
password = gr.Textbox(label="Password", type="password", elem_id="password_input", visible=IS_PASSWORD_SHOW)
file_upload = gr.File(label="Upload your CSV or Word file", visible=False)
youtube_link = gr.Textbox(label="Enter YouTube Link", elem_id="youtube_link_input", visible=IS_YOUTUBE_LINK_SHOW)
video_id = gr.Textbox(label="video_id", visible=False)
youtube_link_btn = gr.Button("Submit_YouTube_Link", elem_id="youtube_link_btn", visible=IS_YOUTUBE_LINK_BTN_SHOW)
web_link = gr.Textbox(label="Enter Web Page Link", visible=False)
with gr.Tab("學生版"):
with gr.Row():
with gr.Column(scale=3):
with gr.Tab("文章模式"):
reading_passage = gr.Textbox(label="Reading Passage", lines=40)
with gr.Tab("重點"):
df_summarise = gr.Textbox(container=True, show_copy_button=True, lines=40)
with gr.Tab("問題"):
gr.Markdown("## 常用問題")
btn_1 = gr.Button()
btn_2 = gr.Button()
btn_3 = gr.Button()
gr.Markdown("## 重新生成問題")
btn_create_question = gr.Button("Create Questions")
with gr.Accordion("See Details", open=False):
with gr.Tab("本文"):
df_string_output = gr.Textbox(lines=40, label="Data Text")
with gr.Tab("逐字稿"):
simple_html_content = gr.HTML(label="Simple Transcript")
with gr.Tab("圖文"):
transcript_html = gr.HTML(label="YouTube Transcript and Video")
with gr.Tab("投影片"):
slide_image = gr.Image()
slide_text = gr.Textbox()
with gr.Row():
prev_button = gr.Button("Previous")
next_button = gr.Button("Next")
prev_button.click(fn=prev_slide, inputs=[], outputs=[slide_image, slide_text])
next_button.click(fn=next_slide, inputs=[], outputs=[slide_image, slide_text])
with gr.Tab("markdown"):
gr.Markdown("## 請複製以下 markdown 並貼到你的心智圖工具中,建議使用:https://markmap.js.org/repl")
mind_map = gr.Textbox(container=True, show_copy_button=True, lines=40, elem_id="mind_map_markdown")
with gr.Tab("心智圖",elem_id="mind_map_tab"):
mind_map_html = gr.HTML()
with gr.Column(scale=2):
with gr.Tab("OPENAI"):
gr.Markdown("## OPEN AI 模式")
chatbot = gr.Chatbot(avatar_images=["https://junyi-avatar.s3.ap-northeast-1.amazonaws.com/live/%20%20foxcat-star-18.png?v=20231113095823614", "https://junyitopicimg.s3.amazonaws.com/s4byy--icon.jpe?v=20200513013523726"], label="OPEN AI 模式")
thread_id = gr.Textbox(label="thread_id", visible=False)
socratic_mode_btn = gr.Checkbox(label="蘇格拉底家教助理模式", value=True)
openai_chatbot_audio_input = gr.Audio(sources=["microphone"], type="filepath")
msg = gr.Textbox(label="Message")
send_button = gr.Button("Send")
with gr.Tab("GROQ"):
gr.Markdown("## GROQ 模式")
groq_chatbot = gr.Chatbot(label="groq mode chatbot")
groq_msg = gr.Textbox(label="Message")
groq_send_button = gr.Button("Send")
with gr.Tab("JUTOR"):
gr.Markdown("## JUTOR API 模式")
jutor_chatbot = gr.Chatbot(label="jutor mode chatbot")
jutor_msg = gr.Textbox(label="Message")
jutor_send_button = gr.Button("Send")
with gr.Tab("教師版"):
with gr.Row():
content_topic = gr.Dropdown(label="選擇主題", choices=["數學", "自然", "國文", "英文", "社會"], value="數學")
content_grade = gr.Dropdown(label="選擇年級", choices=["一年級", "二年級", "三年級", "四年級", "五年級", "六年級", "七年級", "八年級", "九年級", "十年級", "十一年級", "十二年級"], value="三年級")
content_level = gr.Dropdown(label="差異化教學", choices=["基礎", "中級", "進階"], value="基礎")
with gr.Row():
with gr.Column(scale=1):
# with gr.Tab("認知階層評量題目"):
# cognitive_level_content = gr.Textbox(label="輸入學習目標與內容")
# cognitive_level_content_btn = gr.Button("生成評量題目")
with gr.Tab("學習單"):
worksheet_content_type_name = gr.Textbox(value="worksheet", visible=False)
worksheet_algorithm = gr.Dropdown(label="選擇教學策略或理論", choices=["Bloom認知階層理論", "Polya數學解題法", "CRA教學法"], value="Bloom認知階層理論")
worksheet_content_btn = gr.Button("生成學習單 📄")
with gr.Accordion("prompt", open=False):
worksheet_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Tab("課程計畫"):
lesson_plan_content_type_name = gr.Textbox(value="lesson_plan", visible=False)
lesson_plan_time = gr.Slider(label="選擇課程時間(分鐘)", minimum=10, maximum=120, step=5, value=40)
lesson_plan_btn = gr.Button("生成課程計畫 📕")
with gr.Accordion("prompt", open=False):
lesson_plan_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Tab("出場券"):
exit_ticket_content_type_name = gr.Textbox(value="exit_ticket", visible=False)
exit_ticket_time = gr.Slider(label="選擇出場券時間(分鐘)", minimum=5, maximum=10, step=1, value=8)
exit_ticket_btn = gr.Button("生成出場券 🎟️")
with gr.Accordion("prompt", open=False):
exit_ticket_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Tab("素養導向閱讀題組"):
literacy_oriented_reading_content = gr.Textbox(label="輸入閱讀材料")
literacy_oriented_reading_content_btn = gr.Button("生成閱讀理解題")
# with gr.Tab("自我評估"):
# self_assessment_content = gr.Textbox(label="輸入自評問卷或檢查表")
# self_assessment_content_btn = gr.Button("生成自評問卷")
# with gr.Tab("自我反思評量"):
# self_reflection_content = gr.Textbox(label="輸入自我反思活動")
# self_reflection_content_btn = gr.Button("生成自我反思活動")
# with gr.Tab("後設認知"):
# metacognition_content = gr.Textbox(label="輸入後設認知相關問題")
# metacognition_content_btn = gr.Button("生成後設認知問題")
with gr.Column(scale=2):
# 生成對應不同模式的結果
exam_result_prompt = gr.Textbox(visible=False)
exam_result = gr.Textbox(label="初次生成結果", show_copy_button=True)
exam_result_fine_tune_prompt = gr.Textbox(label="根據結果,輸入你想更改的想法")
exam_result_fine_tune_btn = gr.Button("微調結果")
exam_result_fine_result = gr.Textbox(label="微調結果",show_copy_button=True)
# 傳統模式
# send_button.click(
# respond,
# inputs=[msg, df_string_output, chatbot, socratic_mode_btn],
# outputs=[msg, chatbot]
# )
# # 连接按钮点击事件
# btn_1.click(respond, inputs=[btn_1, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot])
# btn_2.click(respond, inputs=[btn_2, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot])
# btn_3.click(respond, inputs=[btn_3, df_string_output, chatbot, socratic_mode_btn], outputs=[msg, chatbot])
# chat_with_youtube_transcript
# send_button.click(
# chat_with_youtube_transcript,
# inputs=[password, video_id, thread_id, df_string_output, msg, chatbot, socratic_mode_btn],
# outputs=[msg, chatbot, thread_id]
# )
# chat_with_youtube_transcript
# OPENAI 模式
send_button.click(
chat_with_youtube_transcript,
inputs=[password, video_id, thread_id, df_string_output, msg, chatbot, socratic_mode_btn],
outputs=[msg, chatbot, thread_id]
)
openai_chatbot_audio_input.change(
process_open_ai_audio_to_chatbot,
inputs=[password, openai_chatbot_audio_input],
outputs=[msg]
)
# GROQ 模式
groq_send_button.click(
chat_with_groq,
inputs=[password, groq_msg, df_string_output, groq_chatbot, socratic_mode_btn],
outputs=[groq_msg, groq_chatbot]
)
# JUTOR API 模式
jutor_send_button.click(
respond_with_jutor_chat,
inputs=[password, jutor_msg, df_string_output, jutor_chatbot, socratic_mode_btn],
outputs=[jutor_msg, jutor_chatbot]
)
# 连接按钮点击事件
btn_1.click(
chat_with_youtube_transcript,
inputs=[password, video_id, thread_id, df_string_output, btn_1, chatbot, socratic_mode_btn],
outputs=[msg, chatbot, thread_id]
)
btn_2.click(
chat_with_youtube_transcript,
inputs=[password, video_id, thread_id, df_string_output, btn_2, chatbot, socratic_mode_btn],
outputs=[msg, chatbot, thread_id]
)
btn_3.click(
chat_with_youtube_transcript,
inputs=[password, video_id, thread_id, df_string_output, btn_3, chatbot, socratic_mode_btn],
outputs=[msg, chatbot, thread_id]
)
btn_create_question.click(change_questions, inputs = [password, df_string_output], outputs = [btn_1, btn_2, btn_3])
# file_upload.change(process_file, inputs=file_upload, outputs=df_string_output)
file_upload.change(process_file, inputs=file_upload, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output])
# 当输入 YouTube 链接时触发
youtube_link.change(
process_youtube_link,
inputs=[password,youtube_link],
outputs=[
video_id,
btn_1,
btn_2,
btn_3,
df_string_output,
df_summarise,
mind_map,
mind_map_html,
transcript_html,
simple_html_content,
slide_image,
slide_text,
reading_passage
]
)
youtube_link_btn.click(
process_youtube_link,
inputs=[password, youtube_link],
outputs=[
video_id,
btn_1,
btn_2,
btn_3,
df_string_output,
df_summarise,
mind_map,
mind_map_html,
transcript_html,
simple_html_content,
slide_image,
slide_text,
reading_passage
]
)
# 当输入网页链接时触发
# web_link.change(process_web_link, inputs=web_link, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output])
# 教師版 學習單
worksheet_content_btn.click(
on_generate_ai_content,
inputs=[password, df_string_output, content_topic, content_grade, content_level, worksheet_algorithm, worksheet_content_type_name],
outputs=[exam_result, worksheet_prompt, exam_result_prompt]
)
lesson_plan_btn.click(
on_generate_ai_content,
inputs=[password, df_string_output, content_topic, content_grade, content_level, lesson_plan_time, lesson_plan_content_type_name],
outputs=[exam_result, lesson_plan_prompt, exam_result_prompt]
)
exit_ticket_btn.click(
on_generate_ai_content,
inputs=[password, df_string_output, content_topic, content_grade, content_level, exit_ticket_time, exit_ticket_content_type_name],
outputs=[exam_result, exit_ticket_prompt, exam_result_prompt]
)
# 生成結果微調
exam_result_fine_tune_btn.click(
generate_exam_fine_tune_result,
inputs=[password, exam_result_prompt, df_string_output, exam_result, exam_result_fine_tune_prompt],
outputs=[exam_result_fine_result]
)
demo.load(
init_params,
inputs =[youtube_link],
outputs = [password , youtube_link]
)
demo.launch(allowed_paths=["videos"])