chat / app.py
simondby
update auth
eb8a298
# -*- coding:utf-8 -*-
import os
import logging
import sys
import gradio as gr
from modules.utils import *
from modules.presets import *
from modules.overwrites import *
from modules.chat_func import *
from modules.openai_func import get_usage
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s",
)
my_api_key = "" # 在这里输入你的 API 密钥
# if we are running in Docker
if os.environ.get("dockerrun") == "yes":
dockerflag = True
else:
dockerflag = False
authflag = False
auth_list = []
if dockerflag:
my_api_key = os.environ.get("my_api_key")
if my_api_key == "empty":
logging.error("Please give a api key!")
sys.exit(1)
# auth
username = os.environ.get("USERNAME")
password = os.environ.get("PASSWORD")
if not (isinstance(username, type(None)) or isinstance(password, type(None))):
auth_list.append((os.environ.get("USERNAME"), os.environ.get("PASSWORD")))
authflag = True
else:
if (
not my_api_key
and os.path.exists("api_key.txt")
and os.path.getsize("api_key.txt")
):
with open("api_key.txt", "r") as f:
my_api_key = f.read().strip()
elif (
not my_api_key
and not os.path.exists("api_key.txt")
and os.environ.get('OPENAI_API_KEY') != None
):
api_keys = os.environ.get("api_keys") or ''
ls_keys = [key.strip() for key in api_keys.splitlines() if key.strip()]
default_key = os.environ.get('OPENAI_API_KEY')
my_api_key = api_picker(ls_keys, default_key)
if os.path.exists("auth.json"):
authflag = True
with open("auth.json", "r", encoding='utf-8') as f:
auth = json.load(f)
for _ in auth:
if auth[_]["username"] and auth[_]["password"]:
auth_list.append((auth[_]["username"], auth[_]["password"]))
else:
logging.error("请检查auth.json文件中的用户名和密码!")
sys.exit(1)
gr.Chatbot.postprocess = postprocess
PromptHelper.compact_text_chunks = compact_text_chunks
with open("assets/custom.css", "r", encoding="utf-8") as f:
customCSS = f.read()
with gr.Blocks(css=customCSS, theme=gr.themes.Soft()) as app:
history = gr.State([])
token_count = gr.State([])
current_user = gr.State("")
promptTemplates = gr.State(load_template(get_template_names(plain=True)[0], mode=2))
user_api_key = gr.State(my_api_key)
user_question = gr.State("")
outputing = gr.State(False)
topic = gr.State("未命名对话历史记录")
with gr.Row():
gr.Markdown(title, elem_id="title")
# status_display = gr.Markdown(get_geoip(), elem_id="status_display")
status_display = gr.Markdown("", elem_id="status_display")
with gr.Row().style(equal_height=True):
with gr.Column(scale=5):
with gr.Row():
emptyBtn = gr.Button("🧹 清空对话")
retryBtn = gr.Button("🔄 重新生成")
delFirstBtn = gr.Button("🗑️ 删除最早对话")
delLastBtn = gr.Button("🗑️ 删除最新对话")
reduceTokenBtn = gr.Button("♻️ 总结对话")
with gr.Row():
chatbot = gr.Chatbot(elem_id="chatbox",label="对话") .style(height="100%")
with gr.Row():
with gr.Column(scale=12):
user_input = gr.Textbox(
show_label=False, placeholder="Enter发送,Shift+Enter换行"
).style(container=False)
with gr.Column(min_width=70, scale=1):
submitBtn = gr.Button("发送", variant="primary")
cancelBtn = gr.Button("取消", variant="secondary", visible=False)
with gr.Column(scale=2):
with gr.Column(min_width=60, scale=2):
with gr.Tab(label="常用设置"):
keyTxt = gr.Textbox(
show_label=True,
info="填写以使用自己的API",
placeholder=f"OpenAI API key",
value=hide_middle_chars(my_api_key),
type="password",
visible=not HIDE_MY_KEY,
label="API-Key",
)
systemPromptTxt = gr.Textbox(
show_label=True,
label="系统输入",
info="系统输入不会直接产生交互,可作为基础信息输入",
placeholder=f"在这里输入System Prompt...",
value=gen_sys_prompt,
lines=8,
).style(container=True)
with gr.Accordion(label="模板", open=False):
with gr.Column():
with gr.Row():
with gr.Column(scale=6):
templateFileSelectDropdown = gr.Dropdown(
label="选择Prompt模板集合文件",
choices=get_template_names(plain=True),
multiselect=False,
value=get_template_names(plain=True)[0],
).style(container=False)
with gr.Column(scale=1):
templateRefreshBtn = gr.Button("🔄 刷新")
with gr.Row():
with gr.Column():
templateSelectDropdown = gr.Dropdown(
label="从Prompt模板中加载",
choices=load_template(
get_template_names(plain=True)[0], mode=1
),
multiselect=False,
value=load_template(
get_template_names(plain=True)[0], mode=1
)[0],
).style(container=False)
use_streaming_checkbox = gr.Checkbox(
label="实时传输回答", value=True, visible=enable_streaming_option
)
use_websearch_checkbox = gr.Checkbox(label="使用在线搜索", value=False)
language_select_dropdown = gr.Dropdown(
label="选择回复语言(针对搜索&索引功能)",
choices=REPLY_LANGUAGES,
multiselect=False,
value=REPLY_LANGUAGES[0],
)
index_files = gr.Files(label="上传索引文件", type="file", elem_id="index_file")
with gr.Tab(label="参数"):
model_select_dropdown = gr.Dropdown(
label="选择模型", choices=MODELS, multiselect=False, value=MODELS[0]
)
top_p = gr.Slider(
label="Top-p (核采样)",
info="0.1意味着只考虑包含最高10%概率质量的token。较小时,更加紧凑连贯,但缺乏创造性。较大时,更加多样化,但会出现语法错误和不连贯。",
minimum=-0,
maximum=1.0,
value=0.5,
step=0.1,
interactive=True,
)
temperature = gr.Slider(
label="采样温度",
info="越高 → 不确定性、创造性越高↑",
minimum=-0,
maximum=2.0,
value=1.0,
step=0.1,
interactive=True
)
with gr.Tab(label="导入/导出"):
with gr.Row():
with gr.Column(scale=6):
# gr.Markdown("默认保存于history文件夹")
historyFileSelectDropdown = gr.Dropdown(
label="加载对话",
choices=get_history_names(plain=True),
multiselect=False,
value=get_history_names(plain=True)[0],
)
with gr.Column(scale=1):
historyRefreshBtn = gr.Button("🔄 刷新")
with gr.Column(scale=6):
saveFileName = gr.Textbox(
show_label=True,
placeholder=f"导出文件名",
label="保存对话",
value="对话历史记录",
).style(container=True)
gr.Markdown("保存为.json文件支持重新导入\nMarkdown文件不支持重新导入,适合本地查看")
saveHistoryBtn = gr.Button("💾 保存为 JSON 文件")
with gr.Row():
with gr.Column(scale=1):
exportMarkdownBtn = gr.Button("📝 导出为 Markdown 文件")
with gr.Row():
with gr.Column():
downloadFile = gr.File(label="上传对话(JSON文件)",
interactive=True)
with gr.Tab(label=""):
gr.Markdown("## ⚠️ 谨慎操作 \n\n如果无法使用请恢复默认设置")
default_btn = gr.Button("🔙 恢复默认设置")
usageTxt = gr.Markdown("**发送消息** 或 **提交key** 以显示额度", elem_id="usage_display")
with gr.Accordion("网络设置", open=True):
apiurlTxt = gr.Textbox(
show_label=True,
placeholder=f"在这里输入API地址...",
label="API地址",
value="https://api.openai.com/v1/chat/completions",
lines=1,
)
# changeAPIURLBtn = gr.Button("🔄 切换API地址")
proxyTxt = gr.Textbox(
show_label=True,
placeholder=f"在这里输入代理地址...",
label="代理地址,如:http://127.0.0.1:10809",
value="",
lines=1,
)
# changeProxyBtn = gr.Button("🔄 设置代理地址")
gr.Markdown(description, elem_id="description",visible=False)
gr.HTML(footer.format(versions=versions_html()), elem_id="footer",visible=False)
chatgpt_predict_args = dict(
fn=predict,
inputs=[
user_api_key,
systemPromptTxt,
history,
user_question,
chatbot,
token_count,
top_p,
temperature,
use_streaming_checkbox,
model_select_dropdown,
use_websearch_checkbox,
index_files,
language_select_dropdown,
],
outputs=[chatbot, history, status_display, token_count],
show_progress=True,
)
start_outputing_args = dict(
fn=start_outputing,
inputs=[],
outputs=[submitBtn, cancelBtn],
show_progress=True,
)
end_outputing_args = dict(
fn=end_outputing, inputs=[], outputs=[submitBtn, cancelBtn]
)
# reset_textbox_args = dict(
# fn=reset_textbox, inputs=[], outputs=[user_input]
# )
transfer_input_args = dict(
fn=transfer_input, inputs=[user_input], outputs=[user_question, user_input, submitBtn, cancelBtn], show_progress=True
)
get_usage_args = dict(
fn=get_usage, inputs=[user_api_key], outputs=[usageTxt], show_progress=False
)
# Chatbot
cancelBtn.click(cancel_outputing, [], [])
user_input.submit(**transfer_input_args).then(**chatgpt_predict_args).then(**end_outputing_args)
# user_input.submit(**get_usage_args)
submitBtn.click(**transfer_input_args).then(**chatgpt_predict_args).then(**end_outputing_args)
# submitBtn.click(**get_usage_args)
emptyBtn.click(
reset_state,
outputs=[chatbot, history, token_count, status_display],
show_progress=True,
)
# emptyBtn.click(**reset_textbox_args)
retryBtn.click(**start_outputing_args).then(
retry,
[
user_api_key,
systemPromptTxt,
history,
chatbot,
token_count,
top_p,
temperature,
use_streaming_checkbox,
model_select_dropdown,
language_select_dropdown,
],
[chatbot, history, status_display, token_count],
show_progress=True,
).then(**end_outputing_args)
# retryBtn.click(**get_usage_args)
delFirstBtn.click(
delete_first_conversation,
[history, token_count],
[history, token_count, status_display],
)
delLastBtn.click(
delete_last_conversation,
[chatbot, history, token_count],
[chatbot, history, token_count, status_display],
show_progress=True,
)
reduceTokenBtn.click(
reduce_token_size,
[
user_api_key,
systemPromptTxt,
history,
chatbot,
token_count,
top_p,
temperature,
gr.State(max_token_streaming//2 if use_streaming_checkbox.value else max_token_all//2),
model_select_dropdown,
language_select_dropdown,
],
[chatbot, history, status_display, token_count],
show_progress=True,
)
# reduceTokenBtn.click(**get_usage_args)
# ChatGPT
# keyTxt.change(submit_key, keyTxt, [user_api_key, status_display]).then(**get_usage_args)
keyTxt.change(submit_key, keyTxt, [user_api_key, status_display])
# keyTxt.submit(**get_usage_args)
# Template
templateRefreshBtn.click(get_template_names, None, [templateFileSelectDropdown])
templateFileSelectDropdown.change(
load_template,
[templateFileSelectDropdown],
[promptTemplates, templateSelectDropdown],
show_progress=True,
)
templateSelectDropdown.change(
get_template_content,
[promptTemplates, templateSelectDropdown, systemPromptTxt],
[systemPromptTxt],
show_progress=True,
)
# S&L
saveHistoryBtn.click(
save_chat_history,
[saveFileName, systemPromptTxt, history, chatbot],
downloadFile,
show_progress=True,
)
saveHistoryBtn.click(get_history_names, None, [historyFileSelectDropdown])
exportMarkdownBtn.click(
export_markdown,
[saveFileName, systemPromptTxt, history, chatbot],
downloadFile,
show_progress=True,
)
historyRefreshBtn.click(get_history_names, None, [historyFileSelectDropdown])
historyFileSelectDropdown.change(
load_chat_history,
[historyFileSelectDropdown, systemPromptTxt, history, chatbot],
[saveFileName, systemPromptTxt, history, chatbot],
show_progress=True,
)
downloadFile.change(
load_chat_history,
[downloadFile, systemPromptTxt, history, chatbot],
[saveFileName, systemPromptTxt, history, chatbot],
)
# Advanced
default_btn.click(
reset_default, [], [apiurlTxt, proxyTxt, status_display], show_progress=True
)
apiurlTxt.change(
change_api_url,
[apiurlTxt],
[status_display],
show_progress=True,
)
# changeAPIURLBtn.click(
# change_api_url,
# [apiurlTxt],
# [status_display],
# show_progress=True,
# )
proxyTxt.change(
change_proxy,
[proxyTxt],
[status_display],
show_progress=True,
)
# changeProxyBtn.click(
# change_proxy,
# [proxyTxt],
# [status_display],
# show_progress=True,
# )
logging.info(
colorama.Back.WHITE
+ "🚀 访问:http://localhost:7860"
+ colorama.Style.RESET_ALL
)
# 默认开启本地服务器,默认可以直接从IP访问,默认不创建公开分享链接
app.title = "Chatbot 🚀"
if __name__ == "__main__":
reload_javascript()
# if running in Docker
if dockerflag:
if authflag:
app.queue(concurrency_count=CONCURRENT_COUNT).launch(
server_name="0.0.0.0",
server_port=7860,
auth=auth_list,
favicon_path="./assets/rocket.png",
)
else:
app.queue(concurrency_count=CONCURRENT_COUNT).launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
favicon_path="./assets/rocket.png",
)
# if not running in Docker
else:
if authflag:
app.queue(concurrency_count=CONCURRENT_COUNT).launch(
auth=auth_check,
favicon_path="./assets/rocket.png",
share=False,
inbrowser=False,
)
else:
app.queue(concurrency_count=CONCURRENT_COUNT).launch(
auth=auth_check,
favicon_path="./assets/rocket.png",
share=False,
inbrowser=False,
) # 改为 share=True 可以创建公开分享链接
# app.queue(concurrency_count=CONCURRENT_COUNT).launch(server_name="0.0.0.0", server_port=7860, share=False) # 可自定义端口
# app.queue(concurrency_count=CONCURRENT_COUNT).launch(server_name="0.0.0.0", server_port=7860,auth=("在这里填写用户名", "在这里填写密码")) # 可设置用户名与密码
# app.queue(concurrency_count=CONCURRENT_COUNT).launch(auth=("在这里填写用户名", "在这里填写密码")) # 适合Nginx反向代理