BFyedek / app.py
SamiKoen's picture
Update app.py
b9110ca verified
raw
history blame
20.3 kB
import gradio as gr
import os
import json
import requests
import xml.etree.ElementTree as ET
import schedule
import time
import threading
from huggingface_hub import HfApi, create_repo
import warnings
import pandas as pd
from docx import Document
import spaces
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
import io
import base64
from gtts import gTTS # Google Text-to-Speech
# Suppress Gradio warnings
warnings.filterwarnings("ignore", category=UserWarning, module="gradio.components.chatbot")
# Log file name and path
LOG_FILE = '/data/chat_logs.txt' if os.path.exists('/data') else 'chat_logs.txt'
print(f"File path: {os.path.abspath(LOG_FILE)}")
# API settings
API_URL = "https://api.openai.com/v1/chat/completions"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY environment variable not set!")
# Fetch Trek bicycle products
url = 'https://www.trekbisiklet.com.tr/output/8582384479'
response = requests.get(url)
root = ET.fromstring(response.content)
products = []
for item in root.findall('item'):
# Get all products, then check for stock availability
name_words = item.find('rootlabel').text.lower().split()
name = name_words[0]
full_name = ' '.join(name_words)
stock_amount = "stokta" if item.find('stockAmount').text > '0' else "stokta değil"
# Don't add price/link info for out-of-stock products
if stock_amount == "stokta":
# Get normal price info
price_str = item.find('priceTaxWithCur').text if item.find('priceTaxWithCur') is not None else "Fiyat bilgisi yok"
# Get EFT price (money transfer discounted original price)
price_eft_str = item.find('priceEft').text if item.find('priceEft') is not None else ""
# Get discounted price (promotional price)
price_rebate_str = item.find('priceRebateWithTax').text if item.find('priceRebateWithTax') is not None else ""
# Get money transfer discounted promotional price
price_rebate_money_order_str = item.find('priceRebateWithMoneyOrderWithTax').text if item.find('priceRebateWithMoneyOrderWithTax') is not None else ""
# Round normal price
try:
price_float = float(price_str)
# Round to nearest 5000 if price is above 200000
if price_float > 200000:
price = str(round(price_float / 5000) * 5000)
# Round to nearest 1000 if price is above 30000
elif price_float > 30000:
price = str(round(price_float / 1000) * 1000)
# Round to nearest 100 if price is above 10000
elif price_float > 10000:
price = str(round(price_float / 100) * 100)
# Otherwise round to nearest 10
else:
price = str(round(price_float / 10) * 10)
except (ValueError, TypeError):
price = price_str # Keep as is if can't convert to number
# Round money transfer discounted original price (if any)
if price_eft_str:
try:
price_eft_float = float(price_eft_str)
# Round to nearest 5000 if price is above 200000
if price_eft_float > 200000:
price_eft = str(round(price_eft_float / 5000) * 5000)
# Round to nearest 1000 if price is above 30000
elif price_eft_float > 30000:
price_eft = str(round(price_eft_float / 1000) * 1000)
# Round to nearest 100 if price is above 10000
elif price_eft_float > 10000:
price_eft = str(round(price_eft_float / 100) * 100)
# Otherwise round to nearest 10
else:
price_eft = str(round(price_eft_float / 10) * 10)
except (ValueError, TypeError):
price_eft = price_eft_str
else:
# If no money transfer discounted price is given, calculate 2.5% discount from original price
try:
price_eft_float = price_float * 0.975 # 2.5% discount
# Round to nearest 5000 if price is above 200000
if price_eft_float > 200000:
price_eft = str(round(price_eft_float / 5000) * 5000)
# Round to nearest 1000 if price is above 30000
elif price_eft_float > 30000:
price_eft = str(round(price_eft_float / 1000) * 1000)
# Round to nearest 100 if price is above 10000
elif price_eft_float > 10000:
price_eft = str(round(price_eft_float / 100) * 100)
# Otherwise round to nearest 10
else:
price_eft = str(round(price_eft_float / 10) * 10)
except (ValueError, TypeError):
price_eft = ""
# Round discounted price
try:
if price_rebate_str:
price_rebate_float = float(price_rebate_str)
# Round to nearest 5000 if price is above 200000
if price_rebate_float > 200000:
price_rebate = str(round(price_rebate_float / 5000) * 5000)
# Round to nearest 1000 if price is above 30000
elif price_rebate_float > 30000:
price_rebate = str(round(price_rebate_float / 1000) * 1000)
# Round to nearest 100 if price is above 10000
elif price_rebate_float > 10000:
price_rebate = str(round(price_rebate_float / 100) * 100)
# Otherwise round to nearest 10
else:
price_rebate = str(round(price_rebate_float / 10) * 10)
else:
price_rebate = ""
except (ValueError, TypeError):
price_rebate = price_rebate_str
# Round money transfer discounted promotional price
try:
if price_rebate_money_order_str:
price_rebate_money_order_float = float(price_rebate_money_order_str)
# Round to nearest 5000 if price is above 200000
if price_rebate_money_order_float > 200000:
price_rebate_money_order = str(round(price_rebate_money_order_float / 5000) * 5000)
# Round to nearest 1000 if price is above 30000
elif price_rebate_money_order_float > 30000:
price_rebate_money_order = str(round(price_rebate_money_order_float / 1000) * 1000)
# Round to nearest 100 if price is above 10000
elif price_rebate_money_order_float > 10000:
price_rebate_money_order = str(round(price_rebate_money_order_float / 100) * 100)
# Otherwise round to nearest 10
else:
price_rebate_money_order = str(round(price_rebate_money_order_float / 10) * 10)
else:
price_rebate_money_order = ""
except (ValueError, TypeError):
price_rebate_money_order = price_rebate_money_order_str
# Get only product link, not image link
product_link = item.find('productLink').text if item.find('productLink') is not None else ""
# Combine all price information
item_info = (stock_amount, price, product_link, price_eft, price_rebate, price_rebate_money_order)
else:
# For out-of-stock products, only indicate stock status, don't provide price and link info
item_info = (stock_amount, "", "", "", "", "")
products.append((name, item_info, full_name))
# Hugging Face token
hfapi = os.getenv("hfapi")
if not hfapi:
raise ValueError("hfapi environment variable not set!")
create_repo("BF", token=hfapi, repo_type="space", space_sdk="gradio", exist_ok=True)
global_chat_history = [] # All chat history
history_lock = threading.Lock() # Lock for global history
file_lock = threading.Lock() # Lock for file writing
last_logged_index = 0 # Last logged message index
# Google Drive authentication (Service Account from Secrets)
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
def authenticate_google_drive():
service_account_json = os.getenv("SERVICE_ACCOUNT_JSON")
if not service_account_json:
raise ValueError("SERVICE_ACCOUNT_JSON environment variable not found!")
try:
json_data = json.loads(service_account_json)
print("JSON parsed successfully:", json_data.keys())
print("Private key content:", json_data.get("private_key"))
creds = Credentials.from_service_account_info(json_data, scopes=SCOPES)
except json.JSONDecodeError as e:
raise ValueError(f"JSON from Secrets is invalid: {e}")
except Exception as e:
raise ValueError(f"Authentication error: {e}")
return build('drive', 'v3', credentials=creds)
def download_file_from_drive(service, file_id, file_name):
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
fh.seek(0)
with open(file_name, 'wb') as f:
f.write(fh.read())
return file_name
# Fetch and process documents from Google Drive
document_content = ""
service = authenticate_google_drive()
try:
# Excel file from Google Drive
excel_file_id = "test10rgiGp5y5ZYU0dpRvps2t0t1dEzjW8LK" # Example Excel file ID, add your own ID
excel_file = download_file_from_drive(service, excel_file_id, "data.xlsx")
df = pd.read_excel(excel_file)
excel_text = df.to_string()
document_content += excel_text + "\n"
except Exception as e:
print(f"Google Drive Excel reading error: {e}")
try:
# Word file from Google Drive
word_file_id = "9I8H7G6F5E4D3C2B1A" # Example Word file ID, add your own ID
word_file = download_file_from_drive(service, word_file_id, "descriptions.docx")
doc = Document(word_file)
word_text = "\n".join([para.text for para in doc.paragraphs])
document_content += word_text
except Exception as e:
print(f"Google Drive Word reading error: {e}")
# Function to generate TTS audio from text
def text_to_speech(text, lang='tr'):
try:
tts = gTTS(text=text, lang=lang, slow=False)
audio_file = io.BytesIO()
tts.write_to_fp(audio_file)
audio_file.seek(0)
audio_base64 = base64.b64encode(audio_file.read()).decode('utf-8')
audio_html = f'<audio autoplay><source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3"></audio>'
return audio_html
except Exception as e:
print(f"TTS error: {e}")
return None
def run_scheduler(chat_history_ref):
def scheduled_save_and_upload():
global last_logged_index
if chat_history_ref:
print(f"Scheduler triggered: {time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
with file_lock: # File writing lock
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write("\n--- Scheduled Save: {} ---\n".format(time.strftime("%Y-%m-%d %H:%M:%S")))
with history_lock: # History lock
new_messages = chat_history_ref[last_logged_index:]
for msg in new_messages:
if msg["role"] in ["user", "assistant"]:
f.write(f"{msg['role'].capitalize()}: {msg['content']}\n")
last_logged_index = len(chat_history_ref) # Update last index
print(f"Chat saved to file: {os.path.abspath(LOG_FILE)}")
except Exception as e:
print(f"Save error: {e}")
time.sleep(5) # Wait before retrying
return # Early exit for retry
HF_REPO_ID = "SamiKoen/BF"
api = HfApi(token=hfapi)
for attempt in range(3): # Try 3 times
try:
with file_lock:
api.upload_file(
path_or_fileobj=LOG_FILE,
path_in_repo="chat_logs.txt",
repo_id=HF_REPO_ID,
repo_type="space",
commit_message="Automatic log update - {}".format(time.strftime("%Y-%m-%d %H:%M:%S"))
)
print(f"Log file uploaded to HF: {LOG_FILE}")
break
except Exception as e:
print(f"HF upload error (attempt {attempt+1}/3): {e}")
time.sleep(5)
else:
print("HF upload failed, all attempts completed.")
print(f"Scheduled job completed: {time.strftime('%H:%M:%S')}")
schedule.every().day.at("11:32").do(scheduled_save_and_upload)
schedule.every().day.at("15:15").do(scheduled_save_and_upload)
schedule.every().day.at("15:30").do(scheduled_save_and_upload)
schedule.every().day.at("17:32").do(scheduled_save_and_upload)
schedule.every().day.at("19:15").do(scheduled_save_and_upload)
schedule.every().day.at("21:30").do(scheduled_save_and_upload)
print("Scheduler started")
while True:
schedule.run_pending()
time.sleep(60)
@spaces.GPU(duration=1200)
def chatbot_fn(user_message, history):
if history is None:
history = []
# Log: Add user message
try:
with file_lock:
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"User: {user_message}\n")
except Exception as e:
print(f"File writing error (User): {e}")
# System messages (prompt) - just keeping the structure without showing actual prompts
system_messages = [
{"role": "system", "content": "You are an AI assistant for Trek bicycles."}
]
# Add document data to system messages
if document_content:
system_messages.append({"role": "system", "content": f"Document information: {document_content}"})
# Check if product name is mentioned in user message
input_words = user_message.lower().split()
for product_info in products:
if product_info[0] in input_words:
if product_info[1][0] == "stokta":
# Check for campaign price
has_campaign = product_info[1][4] and product_info[1][4] != ""
# Original price is always shown
normal_price = f"Original price: {product_info[1][1]} TL"
# Campaign price info
rebate_price = ""
discount_info = ""
eft_price = ""
rebate_money_order_price = ""
if has_campaign:
# If product has campaign, show campaign price
rebate_price = f"\nCampaign price: {product_info[1][4]} TL"
# Calculate discount amount
try:
original_price = float(product_info[1][1].replace(',', '.'))
campaign_price = float(product_info[1][4].replace(',', '.'))
discount_amount = original_price - campaign_price
# Show discount amount if greater than 0
if discount_amount > 0:
# Apply rounding rules for discount amount
if discount_amount > 200000:
discount_amount_rounded = round(discount_amount / 5000) * 5000
elif discount_amount > 30000:
discount_amount_rounded = round(discount_amount / 1000) * 1000
elif discount_amount > 10000:
discount_amount_rounded = round(discount_amount / 100) * 100
else:
discount_amount_rounded = round(discount_amount / 10) * 10
# Discount info
discount_info = f"\nDiscount amount: {discount_amount_rounded:.0f} TL"
except (ValueError, TypeError):
discount_info = ""
# Don't show money transfer discounted campaign price
rebate_money_order_price = ""
else:
# If not campaign, show money transfer discounted normal price
if product_info[1][3] and product_info[1][3] != "":
eft_price = f"\nMoney transfer discounted price: {product_info[1][3]} TL"
# Product link
product_link = f"\nProduct link: {product_info[1][2]}"
# Combine all information
new_msg = f"{product_info[2]} {product_info[1][0]}\n{normal_price}{rebate_price}{discount_info}{eft_price}{rebate_money_order_price}{product_link}"
else:
# If product is out of stock, just show stock status
new_msg = f"{product_info[2]} {product_info[1][0]}"
system_messages.append({"role": "system", "content": new_msg})
messages = system_messages + history + [{"role": "user", "content": user_message}]
payload = {
"model": "gpt-4.5-preview",
"messages": messages,
"temperature": 0.2,
"top_p": 1,
"n": 1,
"stream": True,
"presence_penalty": 0,
"frequency_penalty": 0,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
response = requests.post(API_URL, headers=headers, json=payload, stream=True)
if response.status_code != 200:
yield "An error occurred."
return
partial_response = ""
for chunk in response.iter_lines():
if not chunk:
continue
chunk_str = chunk.decode('utf-8')
if chunk_str.startswith("data: ") and chunk_str != "data: [DONE]":
try:
chunk_data = json.loads(chunk_str[6:])
delta = chunk_data['choices'][0]['delta']
if 'content' in delta:
partial_response += delta['content']
yield partial_response
except json.JSONDecodeError as e:
print(f"JSON parse error: {e} - Chunk: {chunk_str}")
elif chunk_str == "data: [DONE]":
break
# Generate TTS audio
audio_html = text_to_speech(partial_response)
# Add audio to response if available
final_response = partial_response
if audio_html:
final_response += f'\n\n{audio_html}'
# Log: Add assistant response
try:
with file_lock:
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"Bot: {partial_response}\n")
except Exception as e:
print(f"File writing error (Bot): {e}")
# Update global history
with history_lock:
global_chat_history.append({"role": "user", "content": user_message})
global_chat_history.append({"role": "assistant", "content": partial_response})
yield final_response
# Slow echo (for testing)
def slow_echo(message, history):
for i in range(len(message)):
time.sleep(0.05)
yield "You typed: " + message[: i + 1]
# Usage mode
USE_SLOW_ECHO = False
chat_fn = slow_echo if USE_SLOW_ECHO else chatbot_fn
if not USE_SLOW_ECHO:
scheduler_thread = threading.Thread(target=run_scheduler, args=(global_chat_history,), daemon=True)
scheduler_thread.start()
demo = gr.ChatInterface(
fn=chat_fn,
title="Trek Assistant with Text-to-Speech",
theme="default",
type="messages",
flagging_mode="manual",
flagging_options=["Correct", "Incorrect", "Not sure", "Other"],
save_history=True
)
if __name__ == "__main__":
demo.launch(debug=True)