import os |
import sys |
import gradio as gr |
import subprocess |
import threading |
import time |
from datetime import datetime |
import queue |
from pathlib import Path |
import json |
import signal |
import dotenv |
log_queue: queue.Queue[str] = queue.Queue() |
current_process = None |
process_lock = threading.Lock() |
"Qwen Mini (中文)": "run_qwen_mini_zh.py", |
"Qwen (中文)": "run_qwen_zh.py", |
"Mini": "run_mini.py", |
"DeepSeek (中文)": "run_deepseek_zh.py", |
"Default": "run.py", |
"GAIA Roleplaying": "run_gaia_roleplaying.py", |
"OpenAI Compatible": "run_openai_compatiable_model.py", |
"Ollama": "run_ollama.py", |
"Terminal": "run_terminal_zh.py", |
} |
"Qwen Mini (中文)": "使用阿里云Qwen模型的中文版本,适合中文问答和任务", |
"Qwen (中文)": "使用阿里云Qwen模型,支持多种工具和功能", |
"Mini": "轻量级版本,使用OpenAI GPT-4o模型", |
"DeepSeek (中文)": "使用DeepSeek模型,适合非多模态任务", |
"Default": "默认OWL实现,使用OpenAI GPT-4o模型和全套工具", |
"GAIA Roleplaying": "GAIA基准测试实现,用于评估模型能力", |
"OpenAI Compatible": "使用兼容OpenAI API的第三方模型,支持自定义API端点", |
"Ollama": "使用Ollama API", |
"Terminal": "使用本地终端执行python文件", |
} |
"模型API": [ |
{ |
"name": "OPENAI_API_KEY", |
"label": "OpenAI API密钥", |
"type": "password", |
"required": False, |
"help": "OpenAI API密钥,用于访问GPT模型。获取方式:https://platform.openai.com/api-keys", |
}, |
{ |
"name": "OPENAI_API_BASE_URL", |
"label": "OpenAI API基础URL", |
"type": "text", |
"required": False, |
"help": "OpenAI API的基础URL,可选。如果使用代理或自定义端点,请设置此项。", |
}, |
{ |
"name": "QWEN_API_KEY", |
"label": "阿里云Qwen API密钥", |
"type": "password", |
"required": False, |
"help": "阿里云Qwen API密钥,用于访问Qwen模型。获取方式:https://help.aliyun.com/zh/model-studio/developer-reference/get-api-key", |
}, |
{ |
"name": "DEEPSEEK_API_KEY", |
"label": "DeepSeek API密钥", |
"type": "password", |
"required": False, |
"help": "DeepSeek API密钥,用于访问DeepSeek模型。获取方式:https://platform.deepseek.com/api_keys", |
}, |
], |
"搜索工具": [ |
{ |
"name": "GOOGLE_API_KEY", |
"label": "Google API密钥", |
"type": "password", |
"required": False, |
"help": "Google搜索API密钥,用于网络搜索功能。获取方式:https://developers.google.com/custom-search/v1/overview", |
}, |
{ |
"name": "SEARCH_ENGINE_ID", |
"label": "搜索引擎ID", |
"type": "text", |
"required": False, |
"help": "Google自定义搜索引擎ID,与Google API密钥配合使用。获取方式:https://developers.google.com/custom-search/v1/overview", |
}, |
], |
"其他工具": [ |
{ |
"name": "HF_TOKEN", |
"label": "Hugging Face令牌", |
"type": "password", |
"required": False, |
"help": "Hugging Face API令牌,用于访问Hugging Face模型和数据集。获取方式:https://huggingface.co/join", |
}, |
{ |
"name": "CHUNKR_API_KEY", |
"label": "Chunkr API密钥", |
"type": "password", |
"required": False, |
"help": "Chunkr API密钥,用于文档处理功能。获取方式:https://chunkr.ai/", |
}, |
{ |
"name": "FIRECRAWL_API_KEY", |
"label": "Firecrawl API密钥", |
"type": "password", |
"required": False, |
"help": "Firecrawl API密钥,用于网页爬取功能。获取方式:https://www.firecrawl.dev/", |
}, |
], |
"自定义环境变量": [], |
} |
def get_script_info(script_name): |
"""获取脚本的详细信息""" |
return SCRIPT_DESCRIPTIONS.get(script_name, "无描述信息") |
def load_env_vars(): |
"""加载环境变量""" |
env_vars = {} |
dotenv.load_dotenv() |
for group in ENV_GROUPS.values(): |
for var in group: |
env_vars[var["name"]] = os.environ.get(var["name"], "") |
if Path(".env").exists(): |
try: |
with open(".env", "r", encoding="utf-8") as f: |
for line in f: |
line = line.strip() |
if line and not line.startswith("#") and "=" in line: |
try: |
key, value = line.split("=", 1) |
key = key.strip() |
value = value.strip() |
if (value.startswith('"') and value.endswith('"')) or ( |
value.startswith("'") and value.endswith("'") |
): |
value = value[1:-1] |
known_var = False |
for group in ENV_GROUPS.values(): |
if any(var["name"] == key for var in group): |
known_var = True |
break |
if not known_var and key not in env_vars: |
ENV_GROUPS["自定义环境变量"].append( |
{ |
"name": key, |
"label": key, |
"type": "text", |
"required": False, |
"help": "用户自定义环境变量", |
} |
) |
env_vars[key] = value |
except Exception as e: |
print(f"解析环境变量行时出错: {line}, 错误: {str(e)}") |
except Exception as e: |
print(f"加载.env文件时出错: {str(e)}") |
return env_vars |
def save_env_vars(env_vars): |
"""保存环境变量到.env文件""" |
env_path = Path(".env") |
existing_content = {} |
if env_path.exists(): |
try: |
with open(env_path, "r", encoding="utf-8") as f: |
for line in f: |
line = line.strip() |
if line and not line.startswith("#") and "=" in line: |
try: |
key, value = line.split("=", 1) |
existing_content[key.strip()] = value.strip() |
except Exception as e: |
print(f"解析环境变量行时出错: {line}, 错误: {str(e)}") |
except Exception as e: |
print(f"读取.env文件时出错: {str(e)}") |
for key, value in env_vars.items(): |
if value is not None: |
value = str(value) |
if (value.startswith('"') and value.endswith('"')) or ( |
value.startswith("'") and value.endswith("'") |
): |
existing_content[key] = value |
os.environ[key] = value[1:-1] |
else: |
quoted_value = f'"{value}"' |
existing_content[key] = quoted_value |
os.environ[key] = value |
try: |
with open(env_path, "w", encoding="utf-8") as f: |
for key, value in existing_content.items(): |
f.write(f"{key}={value}\n") |
except Exception as e: |
print(f"写入.env文件时出错: {str(e)}") |
return f"❌ 保存环境变量失败: {str(e)}" |
return "✅ 环境变量已保存" |
def add_custom_env_var(name, value, var_type): |
"""添加自定义环境变量""" |
if not name: |
return "❌ 环境变量名不能为空", None |
for group in ENV_GROUPS.values(): |
if any(var["name"] == name for var in group): |
return f"❌ 环境变量 {name} 已存在", None |
ENV_GROUPS["自定义环境变量"].append( |
{ |
"name": name, |
"label": name, |
"type": var_type, |
"required": False, |
"help": "用户自定义环境变量", |
} |
) |
env_vars = {name: value} |
save_env_vars(env_vars) |
return f"✅ 已添加环境变量 {name}", ENV_GROUPS["自定义环境变量"] |
def update_custom_env_var(name, value, var_type): |
"""更改自定义环境变量""" |
if not name: |
return "❌ 环境变量名不能为空", None |
found = False |
for i, var in enumerate(ENV_GROUPS["自定义环境变量"]): |
if var["name"] == name: |
ENV_GROUPS["自定义环境变量"][i]["type"] = var_type |
found = True |
break |
if not found: |
return f"❌ 自定义环境变量 {name} 不存在", None |
env_vars = {name: value} |
save_env_vars(env_vars) |
return f"✅ 已更新环境变量 {name}", ENV_GROUPS["自定义环境变量"] |
def delete_custom_env_var(name): |
"""删除自定义环境变量""" |
if not name: |
return "❌ 环境变量名不能为空", None |
found = False |
for i, var in enumerate(ENV_GROUPS["自定义环境变量"]): |
if var["name"] == name: |
del ENV_GROUPS["自定义环境变量"][i] |
found = True |
break |
if not found: |
return f"❌ 自定义环境变量 {name} 不存在", None |
env_path = Path(".env") |
if env_path.exists(): |
try: |
with open(env_path, "r", encoding="utf-8") as f: |
lines = f.readlines() |
with open(env_path, "w", encoding="utf-8") as f: |
for line in lines: |
try: |
line_stripped = line.strip() |
if not line_stripped or line_stripped.startswith("#"): |
f.write(line) |
continue |
if "=" not in line_stripped: |
f.write(line) |
continue |
var_name = line_stripped.split("=", 1)[0].strip() |
if var_name != name: |
f.write(line) |
except Exception as e: |
print(f"处理.env文件行时出错: {line}, 错误: {str(e)}") |
f.write(line) |
except Exception as e: |
print(f"删除环境变量时出错: {str(e)}") |
return f"❌ 删除环境变量失败: {str(e)}", None |
if name in os.environ: |
del os.environ[name] |
return f"✅ 已删除环境变量 {name}", ENV_GROUPS["自定义环境变量"] |
def terminate_process(): |
"""终止当前运行的进程""" |
global current_process |
with process_lock: |
if current_process is not None and current_process.poll() is None: |
try: |
if os.name == "nt": |
pid = current_process.pid |
try: |
subprocess.run( |
["taskkill", "/F", "/T", "/PID", str(pid)], check=False |
) |
except subprocess.SubprocessError as e: |
log_queue.put(f"终止进程时出错: {str(e)}\n") |
return f"❌ 终止进程时出错: {str(e)}" |
else: |
current_process.terminate() |
try: |
current_process.wait(timeout=3) |
except subprocess.TimeoutExpired: |
current_process.kill() |
try: |
current_process.wait(timeout=2) |
except subprocess.TimeoutExpired: |
pass |
log_queue.put("进程已终止\n") |
return "✅ 进程已终止" |
except Exception as e: |
log_queue.put(f"终止进程时出错: {str(e)}\n") |
return f"❌ 终止进程时出错: {str(e)}" |
else: |
return "❌ 没有正在运行的进程" |
def run_script(script_dropdown, question, progress=gr.Progress()): |
"""运行选定的脚本并返回输出""" |
global current_process |
script_name = SCRIPTS.get(script_dropdown) |
if not script_name: |
return "❌ 无效的脚本选择", "", "", "", None |
if not question.strip(): |
return "请输入问题!", "", "", "", None |
while not log_queue.empty(): |
log_queue.get() |
log_dir = Path("logs") |
log_dir.mkdir(exist_ok=True) |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
log_file = log_dir / f"{script_name.replace('.py', '')}_{timestamp}.log" |
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
cmd = [ |
sys.executable, |
os.path.join(base_path, "owl", "script_adapter.py"), |
os.path.join(base_path, "owl", script_name), |
] |
env = os.environ.copy() |
if not isinstance(question, str): |
question = str(question) |
env["OWL_QUESTION"] = question |
with process_lock: |
current_process = subprocess.Popen( |
cmd, |
stdout=subprocess.PIPE, |
stderr=subprocess.STDOUT, |
text=True, |
bufsize=1, |
env=env, |
encoding="utf-8", |
) |
def read_output(): |
try: |
timestamp_unique = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
unique_log_file = ( |
log_dir / f"{script_name.replace('.py', '')}_{timestamp_unique}.log" |
) |
with open(unique_log_file, "w", encoding="utf-8") as f: |
nonlocal log_file |
log_file = unique_log_file |
for line in iter(current_process.stdout.readline, ""): |
if line: |
f.write(line) |
f.flush() |
log_queue.put(line) |
except Exception as e: |
log_queue.put(f"读取输出时出错: {str(e)}\n") |
threading.Thread(target=read_output, daemon=True).start() |
logs = [] |
progress(0, desc="正在运行...") |
start_time = time.time() |
timeout = 1800 |
while current_process.poll() is None: |
if time.time() - start_time > timeout: |
with process_lock: |
if current_process.poll() is None: |
if os.name == "nt": |
current_process.send_signal(signal.CTRL_BREAK_EVENT) |
else: |
current_process.terminate() |
log_queue.put("执行超时,已终止进程\n") |
break |
while not log_queue.empty(): |
log = log_queue.get() |
logs.append(log) |
elapsed = time.time() - start_time |
progress(min(elapsed / 300, 0.99), desc="正在运行...") |
time.sleep(0.1) |
yield ( |
status_message(current_process), |
extract_answer(logs), |
"".join(logs), |
str(log_file), |
None, |
) |
while not log_queue.empty(): |
logs.append(log_queue.get()) |
chat_history = extract_chat_history(logs) |
return ( |
status_message(current_process), |
extract_answer(logs), |
"".join(logs), |
str(log_file), |
chat_history, |
) |
def status_message(process): |
"""根据进程状态返回状态消息""" |
if process.poll() is None: |
return "⏳ 正在运行..." |
elif process.returncode == 0: |
return "✅ 执行成功" |
else: |
return f"❌ 执行失败 (返回码: {process.returncode})" |
def extract_answer(logs): |
"""从日志中提取答案""" |
answer = "" |
for log in logs: |
if "Answer:" in log: |
answer = log.split("Answer:", 1)[1].strip() |
break |
return answer |
def extract_chat_history(logs): |
"""尝试从日志中提取聊天历史""" |
try: |
chat_json_str = "" |
capture_json = False |
for log in logs: |
if "chat_history" in log: |
start_idx = log.find("[") |
if start_idx != -1: |
capture_json = True |
chat_json_str = log[start_idx:] |
elif capture_json: |
chat_json_str += log |
if "]" in log: |
end_idx = chat_json_str.rfind("]") + 1 |
if end_idx > 0: |
try: |
json_str = chat_json_str[:end_idx].strip() |
chat_data = json.loads(json_str) |
formatted_chat = [] |
for msg in chat_data: |
if "role" in msg and "content" in msg: |
role = "用户" if msg["role"] == "user" else "助手" |
formatted_chat.append([role, msg["content"]]) |
return formatted_chat |
except json.JSONDecodeError: |
pass |
except Exception: |
capture_json = False |
except Exception: |
pass |
return None |
def create_ui(): |
"""创建Gradio界面""" |
env_vars = load_env_vars() |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as app: |
gr.Markdown( |
""" |
# 🦉 OWL 智能助手运行平台 |
选择一个模型并输入您的问题,系统将运行相应的脚本并显示结果。 |
""" |
) |
with gr.Tabs(): |
with gr.TabItem("运行模式"): |
with gr.Row(): |
with gr.Column(scale=1): |
default_script = list(SCRIPTS.keys())[0] if SCRIPTS else None |
script_dropdown = gr.Dropdown( |
choices=list(SCRIPTS.keys()), |
value=default_script, |
label="选择模式", |
) |
script_info = gr.Textbox( |
value=get_script_info(default_script) |
if default_script |
else "", |
label="模型描述", |
interactive=False, |
) |
script_dropdown.change( |
fn=lambda x: get_script_info(x), |
inputs=script_dropdown, |
outputs=script_info, |
) |
question_input = gr.Textbox( |
lines=8, |
placeholder="请输入您的问题...", |
label="问题", |
elem_id="question_input", |
show_copy_button=True, |
) |
gr.Markdown( |
""" |
> **注意**: 您输入的问题将替换脚本中的默认问题。系统会自动处理问题的替换,确保您的问题被正确使用。 |
> 支持多行输入,换行将被保留。 |
""" |
) |
with gr.Row(): |
run_button = gr.Button("运行", variant="primary") |
stop_button = gr.Button("终止", variant="stop") |
with gr.Column(scale=2): |
with gr.Tabs(): |
with gr.TabItem("结果"): |
status_output = gr.Textbox(label="状态") |
answer_output = gr.Textbox(label="回答", lines=10) |
log_file_output = gr.Textbox(label="日志文件路径") |
with gr.TabItem("运行日志"): |
log_output = gr.Textbox(label="完整日志", lines=25) |
with gr.TabItem("聊天历史"): |
chat_output = gr.Chatbot(label="对话历史") |
examples = [ |
[ |
"Qwen Mini (中文)", |
"浏览亚马逊并找出一款对程序员有吸引力的产品。请提供产品名称和价格", |
], |
[ |
"DeepSeek (中文)", |
"请分析GitHub上CAMEL-AI项目的最新统计数据。找出该项目的星标数量、贡献者数量和最近的活跃度。然后,创建一个简单的Excel表格来展示这些数据,并生成一个柱状图来可视化这些指标。最后,总结CAMEL项目的受欢迎程度和发展趋势。", |
], |
[ |
"Default", |
"Navigate to Amazon.com and identify one product that is attractive to coders. Please provide me with the product name and price. No need to verify your answer.", |
], |
] |
gr.Examples(examples=examples, inputs=[script_dropdown, question_input]) |
with gr.TabItem("环境变量配置"): |
env_inputs = {} |
save_status = gr.Textbox(label="保存状态", interactive=False) |
with gr.Accordion("添加自定义环境变量", open=True): |
with gr.Row(): |
new_var_name = gr.Textbox( |
label="环境变量名", placeholder="例如:MY_CUSTOM_API_KEY" |
) |
new_var_value = gr.Textbox( |
label="环境变量值", placeholder="输入值" |
) |
new_var_type = gr.Dropdown( |
choices=["text", "password"], value="text", label="类型" |
) |
add_var_button = gr.Button("添加环境变量", variant="primary") |
add_var_status = gr.Textbox(label="添加状态", interactive=False) |
custom_vars_list = gr.JSON( |
value=ENV_GROUPS["自定义环境变量"], |
label="已添加的自定义环境变量", |
visible=len(ENV_GROUPS["自定义环境变量"]) > 0, |
) |
with gr.Accordion( |
"更改或删除自定义环境变量", |
open=True, |
visible=len(ENV_GROUPS["自定义环境变量"]) > 0, |
) as update_delete_accordion: |
with gr.Row(): |
custom_var_dropdown = gr.Dropdown( |
choices=[ |
var["name"] for var in ENV_GROUPS["自定义环境变量"] |
], |
label="选择环境变量", |
interactive=True, |
) |
update_var_value = gr.Textbox( |
label="新的环境变量值", placeholder="输入新值" |
) |
update_var_type = gr.Dropdown( |
choices=["text", "password"], value="text", label="类型" |
) |
with gr.Row(): |
update_var_button = gr.Button("更新环境变量", variant="primary") |
delete_var_button = gr.Button("删除环境变量", variant="stop") |
update_var_status = gr.Textbox(label="操作状态", interactive=False) |
add_var_button.click( |
fn=add_custom_env_var, |
inputs=[new_var_name, new_var_value, new_var_type], |
outputs=[add_var_status, custom_vars_list], |
).then( |
fn=lambda vars: {"visible": len(vars) > 0}, |
inputs=[custom_vars_list], |
outputs=[update_delete_accordion], |
) |
update_var_button.click( |
fn=update_custom_env_var, |
inputs=[custom_var_dropdown, update_var_value, update_var_type], |
outputs=[update_var_status, custom_vars_list], |
) |
delete_var_button.click( |
fn=delete_custom_env_var, |
inputs=[custom_var_dropdown], |
outputs=[update_var_status, custom_vars_list], |
).then( |
fn=lambda vars: {"visible": len(vars) > 0}, |
inputs=[custom_vars_list], |
outputs=[update_delete_accordion], |
) |
custom_vars_list.change( |
fn=lambda vars: { |
"choices": [var["name"] for var in vars], |
"value": None, |
}, |
inputs=[custom_vars_list], |
outputs=[custom_var_dropdown], |
) |
for group_name, vars in ENV_GROUPS.items(): |
if ( |
group_name != "自定义环境变量" or len(vars) > 0 |
): |
with gr.Accordion( |
group_name, open=(group_name != "自定义环境变量") |
): |
for var in vars: |
gr.Markdown(f"**{var['help']}**") |
if var["type"] == "password": |
env_inputs[var["name"]] = gr.Textbox( |
value=env_vars.get(var["name"], ""), |
label=var["label"], |
placeholder=f"请输入{var['label']}", |
type="password", |
) |
else: |
env_inputs[var["name"]] = gr.Textbox( |
value=env_vars.get(var["name"], ""), |
label=var["label"], |
placeholder=f"请输入{var['label']}", |
) |
save_button = gr.Button("保存环境变量", variant="primary") |
save_inputs = [ |
env_inputs[var_name] |
for group in ENV_GROUPS.values() |
for var in group |
for var_name in [var["name"]] |
if var_name in env_inputs |
] |
save_button.click( |
fn=lambda *values: save_env_vars( |
dict( |
zip( |
[ |
var["name"] |
for group in ENV_GROUPS.values() |
for var in group |
if var["name"] in env_inputs |
], |
values, |
) |
) |
), |
inputs=save_inputs, |
outputs=save_status, |
) |
run_button.click( |
fn=run_script, |
inputs=[script_dropdown, question_input], |
outputs=[ |
status_output, |
answer_output, |
log_output, |
log_file_output, |
chat_output, |
], |
show_progress=True, |
) |
stop_button.click(fn=terminate_process, inputs=[], outputs=[status_output]) |
gr.Markdown( |
""" |
### 📝 使用说明 |
- 选择一个模型并输入您的问题 |
- 点击"运行"按钮开始执行 |
- 如需终止运行,点击"终止"按钮 |
- 在"结果"标签页查看执行状态和回答 |
- 在"运行日志"标签页查看完整日志 |
- 在"聊天历史"标签页查看对话历史(如果有) |
- 在"环境变量配置"标签页配置API密钥和其他环境变量 |
- 您可以添加自定义环境变量,满足特殊需求 |
### ⚠️ 注意事项 |
- 运行某些模型可能需要API密钥,请确保在"环境变量配置"标签页中设置了相应的环境变量 |
- 某些脚本可能需要较长时间运行,请耐心等待 |
- 如果运行超过30分钟,进程将自动终止 |
- 您输入的问题将替换脚本中的默认问题,确保问题与所选模型兼容 |
""" |
) |
return app |
if __name__ == "__main__": |
app = create_ui() |
app.queue().launch(share=True) |