FakeQA / app.py
Charles Chan
coding
e85774e
import streamlit as st
import random
from langchain_community.llms import HuggingFaceHub
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_community.vectorstores import FAISS
from datasets import load_dataset
# Streamlit 界面
st.title("外挂知识库问答系统")
# 使用 假知识 数据集
if "data_list" not in st.session_state:
st.session_state.data_list = []
st.session_state.answer_list = []
if not st.session_state.data_list:
try:
with st.spinner("正在读取数据库..."):
dataset = load_dataset("zeerd/fake_knowledge")
# 输出前五条数据
print(dataset["train"][:5])
data_list = []
answer_list = []
for example in dataset["train"]:
answer_list.append(example["Answer"])
data_list.append({"Question": example["Question"], "Answer": example["Answer"]})
st.session_state.answer_list = answer_list
st.session_state.data_list = data_list
st.success("数据库读取完成!")
print("数据库读取完成!")
except Exception as e:
st.error(f"读取数据集失败:{e}")
st.stop()
# 构建向量数据库 (如果需要,仅构建一次)
if "vector_created" not in st.session_state:
st.session_state.vector_created = False
if not st.session_state.vector_created:
try:
with st.spinner("正在构建向量数据库..."):
# all-mpnet-base-v2 是一个由 Sentence Transformers 库提供的预训练模型,
# 专门用于生成高质量的句子嵌入(sentence embeddings)。
# all-mpnet-base-v2 在多个自然语言处理任务上表现出色,包括语义相似度计算、
# 文本检索、聚类等。它能够有效地捕捉句子的语义信息,并生成具有代表性的向量表示。
st.session_state.embeddings = SentenceTransformerEmbeddings(model_name="all-mpnet-base-v2")
st.session_state.db = FAISS.from_texts(st.session_state.answer_list, st.session_state.embeddings)
st.success("向量数据库构建完成!")
print("向量数据库构建完成!")
except Exception as e:
st.error(f"向量数据库构建失败:{e}")
st.stop()
st.session_state.vector_created = True
if "repo_id" not in st.session_state:
st.session_state.repo_id = ''
if "temperature" not in st.session_state:
st.session_state.temperature = ''
if "max_length" not in st.session_state:
st.session_state.max_length = ''
def get_answer(prompt):
answer = st.session_state.llm.invoke(prompt)
# 去掉 prompt 的内容
answer = answer.replace(prompt, "").strip()
print(answer)
return answer
# 问答函数
def answer_question(repo_id, temperature, max_length, question):
# 初始化 Gemma 模型
print('repo_id: ' + repo_id)
print('temperature: ' + str(temperature))
print('max_length: ' + str(max_length))
if repo_id != st.session_state.repo_id or temperature != st.session_state.temperature or max_length != st.session_state.max_length:
try:
with st.spinner("正在初始化 Gemma 模型..."):
st.session_state.llm = HuggingFaceHub(repo_id=repo_id, model_kwargs={"temperature": temperature, "max_length": max_length})
st.success("Gemma 模型初始化完成!")
print("Gemma 模型初始化完成!")
st.session_state.repo_id = repo_id
st.session_state.temperature = temperature
st.session_state.max_length = max_length
except Exception as e:
st.error(f"Gemma 模型加载失败:{e}")
st.stop()
# 获取答案
try:
with st.spinner("正在生成答案(基于模型自身)..."):
pure_answer = get_answer(question)
st.success("答案生成完毕(基于模型自身)!")
print("答案生成完毕(基于模型自身)!")
with st.spinner("正在筛选本地数据集..."):
question_embedding = st.session_state.embeddings.embed_query(question)
# question_embedding_str = " ".join(map(str, question_embedding))
docs_and_scores = st.session_state.db.similarity_search_by_vector(question_embedding)
context_list = []
for doc, score in docs_and_scores:
print(str(score) + ' : ' + doc.page_content)
context_list.append(doc.page_content)
context = "\n".join(context_list)
prompt = f"请根据以下知识库回答问题:\n{context}\n问题:{question}"
print('prompt: ' + prompt)
st.success("本地数据集筛选完成!")
print("本地数据集筛选完成!")
with st.spinner("正在生成答案(基于本地数据集)..."):
answer = get_answer(prompt)
st.success("答案生成完毕(基于本地数据集)!")
print("答案生成完毕(基于本地数据集)!")
return {"prompt": prompt, "answer": answer, "pure_answer": pure_answer}
except Exception as e:
st.error(f"问答过程出错:{e}")
return {"prompt": "", "answer": "An error occurred during the answering process.", "pure_answer": ""}
col1, col2 = st.columns(2)
with col1:
gemma = st.selectbox("repo-id", ("google/gemma-2-9b-it", "google/gemma-2-2b-it", "google/recurrentgemma-2b-it"), 2)
with col2:
temperature = st.number_input("temperature", value=1.0)
max_length = st.number_input("max_length", value=1024)
st.divider()
def generate_answer(repo_id, temperature, max_length, question):
result = answer_question(repo_id, float(temperature), int(max_length), question)
print('prompt: ' + result["prompt"])
print('answer: ' + result["answer"])
print('pure_answer: ' + result["pure_answer"])
st.write("生成答案(无参考):")
st.write(result["pure_answer"])
st.divider()
st.write("参考文字:")
st.markdown(result["prompt"].replace('\n', '<br/>'))
st.write("生成答案:")
st.write(result["answer"])
col3, col4 = st.columns(2)
with col3:
if st.button("使用原数据集中的随机问题"):
dataset_size = len(st.session_state.data_list)
random_index = random.randint(0, dataset_size - 1)
# 读取随机问题
random_question = st.session_state.data_list[random_index]["Question"]
origin_answer = st.session_state.data_list[random_index]["Answer"]
print('[]' + str(random_index) + '/' + str(dataset_size) + ']random_question: ' + random_question)
print('origin_answer: ' + origin_answer)
st.write("随机问题:")
st.write(random_question)
st.write("原始答案:")
st.write(origin_answer)
generate_answer(gemma, float(temperature), int(max_length), random_question)
with col4:
question = st.text_area("请输入问题", "太阳为什么是绿色的?")
if st.button("提交输入的问题"):
if not question:
st.warning("请输入问题!")
else:
generate_answer(gemma, float(temperature), int(max_length), question)