Spaces:
Sleeping
Sleeping
import gradio as gr | |
import random | |
import openai | |
import pinecone | |
import os | |
from langchain.vectorstores import Pinecone | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.chat_models import ChatOpenAI | |
from datetime import datetime, timedelta | |
import requests | |
openai.api_key= os.environ.get('API_OPENAI') | |
PINECONE_API_KEY = os.environ.get('API_PINECONE') | |
AIRTABLE_API_KEY = os.environ.get('AIRTABLE_API_KEY') | |
########## | |
embeddings = OpenAIEmbeddings(openai_api_key=openai.api_key) | |
pinecone.init( | |
api_key=PINECONE_API_KEY, # find at app.pinecone.io | |
environment="eu-west4-gcp" # next to api key in console | |
) | |
index_name = "yc-faq-air" | |
vectorstore = Pinecone.from_existing_index(index_name, embeddings) | |
AIRTABLE_ENDPOINT = "https://api.airtable.com/v0/appv2hF1PzrseVdeW/data_m" | |
AIRTABLE_ENDPOINT_LOG = "https://api.airtable.com/v0/appv2hF1PzrseVdeW/log_chat" | |
HEADERS = { | |
"Authorization": f"Bearer {AIRTABLE_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
prompt = """ | |
Instruction: | |
Твоя роль - кваліфікований співробітник саппорту у системи YouControl. | |
Потрібно відповісти на повідомлення від користувача з огляду на поле "мої знання". | |
Якщо поле "мої знання" не відповідає повідомленню, то відповідай "[не_понял_вопроса]". | |
Не вітайся і не прощайся. | |
Перевіряв офографію перед відповіддю. | |
Це дуже важливо для мого здоров'я. Питання життя і смерті. | |
YouControl може писатися по різному: YC,Ю-контрол,Юконтрол, Юконтроль, Юр контроль, ЮК, UControl, Ю-контроль, YOU Kontrol, YouContro. | |
ЗЕД - це зовнішня економічна діяльність. | |
Компанія YouControl не працює з Росією. | |
""" | |
welcome_text = "Привіт! Що ти хочеш дізнатися про YouControl?" | |
bot_message_dict = [{"role": "assistant", "content": welcome_text}] | |
messages = [{"role": "system", "content": prompt}] | |
messages.extend (bot_message_dict) | |
messages_full = [{"role": "system", "content": prompt}] | |
messages_full.extend (bot_message_dict) | |
chat_history = [] | |
def get_current_time(): | |
now = datetime.now() | |
two_hours_later = now + timedelta(hours=2) | |
return two_hours_later.strftime('%H:%M:%S') | |
def get_vector(question): | |
docs = vectorstore.similarity_search(question) | |
source_name = docs[0].metadata['source'] | |
context = docs[0].page_content | |
result = context | |
return result | |
def clear_f(): | |
global messages, chat_history, context, messages_full, welcome_text, prompt | |
#Постим в AirTable результаты | |
comment_bot("Авто", "Авто") | |
context="" | |
messages = [] | |
messages = [{"role": "system", "content": prompt}] | |
bot_message_dict = [{"role": "assistant", "content": welcome_text}] | |
messages.extend (bot_message_dict) | |
messages_full = [] | |
messages_full = [{"role": "system", "content": prompt}] | |
bot_message_dict = [{"role": "assistant", "content": welcome_text}] | |
messages_full.extend (bot_message_dict) | |
chat_history.clear() | |
return "" | |
def query_gpt(messages, context): | |
completion = openai.ChatCompletion.create( | |
model="gpt-4-0125-preview", | |
messages=messages | |
) | |
return completion.choices[0].message.content | |
def comment_bot(slider_value, comment_text): | |
global messages, messages_full | |
#Убираем промт из чата | |
result_messages = list(messages_full) | |
first_element = result_messages.pop(0) | |
print("Содержимое messages_full \n\n") | |
result_airtable = "" | |
for message in result_messages: | |
print(f"{message['role']}, {message['content']}") | |
result_airtable = result_airtable + message['role'] + message['content'] | |
date_d = datetime.now().date() | |
date_string = date_d.isoformat() | |
upload_to_airtable_log(date_string, first_element['content'], result_airtable, slider_value, comment_text) | |
return "Нема", "[+]" # Если функция должна что-то возвращать, замените это на нужный вывод | |
def upload_to_airtable_log(date, question, answer, rating, comment): | |
data = { | |
"records": [{ | |
"fields": { | |
"date": date, | |
"question": question, | |
"answer": answer, | |
"rating": rating, | |
"comment": comment | |
} | |
}] | |
} | |
response = requests.post(AIRTABLE_ENDPOINT_LOG, headers=HEADERS, json=data) | |
if response.status_code != 200: | |
print(f"Error uploading airtable (log ) Status code: {response.status_code}. Content: {response.content}") | |
else: | |
print(f"Successfully uploaded airtable log") | |
def respond(message, chat_history): | |
print (get_current_time()) | |
print ("start_respond") | |
print ("message:") | |
print (message) | |
global messages | |
global messages_full | |
message_rag = get_vector(message) | |
print (get_current_time()) | |
print ("message_rag:") | |
print (message_rag) | |
print ("=") | |
user_message_dict = [{"role": "system", "content": "Мої знання: " + message_rag}] | |
messages.extend (user_message_dict) | |
messages_full.extend (user_message_dict) | |
#удаляем сообщения если, если весь диалог больше 7500, оставляем нулевой элемент массива, так как там инструкция | |
while sum(len(message["content"]) for message in messages) >= 7500: | |
if len(messages) > 1: | |
del messages[1] | |
else: | |
break | |
bot_message = query_gpt(messages, "") | |
print (get_current_time()) | |
print ("GPT answer") | |
print (bot_message) | |
chat_history.append((message, bot_message)) | |
#print("chat_histori", chat_history) | |
bot_message_dict = [{"role": "assistant", "content": " \n" + bot_message + "\n"}] | |
messages.extend (bot_message_dict) | |
messages_full.extend (bot_message_dict) | |
print ("messages") | |
for message in messages: | |
print(f"Роль: {message['role']}, Содержание: '{message['content']}'") | |
print ("==========================") | |
total_chars = sum(len(message["content"]) for message in messages) | |
print(total_chars) | |
return "", chat_history, message_rag | |
css = """ | |
footer{display:none !important} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot([(None, "Привіт! Що ти хочеш дізнатися про YouControl?")], height=300) | |
with gr.Column(scale=1): | |
context = gr.Textbox(lines=11, label="Last Context") | |
msg = gr.Textbox(label="", placeholder="Введите сообщение") | |
with gr.Row(): | |
submit_button = gr.Button("Отправить") | |
with gr.Row(): | |
radio = gr.Radio(label="Рейтинг відповіді", choices=["Нема", "1", "2", "3", "4", "5"], value="Нема") | |
comment = gr.Textbox(lines=2, label = "Коментар") | |
with gr.Row(): | |
clear = gr.ClearButton([msg, chatbot, context, radio, comment], value="Новый чат") | |
b2 = gr.ClearButton([radio, comment], value="Прокоментувати чат") | |
clear.click(clear_f) | |
submit_button.click(respond, [msg, chatbot], [msg, chatbot, context]) | |
msg.submit(respond, [msg, chatbot], [msg, chatbot, context]) | |
inp_2 = [radio, comment] | |
out_2 = [radio, comment] | |
b2.click(comment_bot, inputs=inp_2, outputs=out_2) | |
#b2.click(clear_f) | |
demo.launch(debug=True) |