Spaces:
Sleeping
Sleeping
import pandas as pd | |
import os | |
import openai | |
import streamlit as st | |
from transformers import DistilBertTokenizer, DistilBertModel | |
from huggingface_hub import login | |
# Load environment variables | |
HUGGINGFACE_TOKEN = os.getenv('HUGGINGFACE_TOKEN') | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
if not HUGGINGFACE_TOKEN or not OPENAI_API_KEY: | |
raise EnvironmentError("HUGGINGFACE_TOKEN and/or OPENAI_API_KEY environment variable is not set.") | |
# Log in to Hugging Face | |
login(token=HUGGINGFACE_TOKEN) | |
# Load model and tokenizer | |
model_name = "distilbert-base-uncased" | |
tokenizer = DistilBertTokenizer.from_pretrained(model_name, use_auth_token=HUGGINGFACE_TOKEN) | |
model = DistilBertModel.from_pretrained(model_name, use_auth_token=HUGGINGFACE_TOKEN) | |
# Set OpenAI API key | |
openai.api_key = OPENAI_API_KEY | |
def generate_openai_response(user_query): | |
"""Generate a response using the OpenAI API.""" | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", # Use a valid model name | |
messages=[ | |
{"role": "system", "content": "You are a friendly and helpful assistant."}, | |
{"role": "user", "content": user_query} | |
], | |
max_tokens=150, | |
temperature=0.7, | |
) | |
return response.choices[0].message['content'] | |
except Exception as e: | |
return f"Error generating response: {e}" | |
def load_electronics_dataset(): | |
"""Load the electronics dataset.""" | |
try: | |
return pd.read_csv('electronics.csv') | |
except FileNotFoundError: | |
raise Exception("Electronics dataset file not found.") | |
except pd.errors.EmptyDataError: | |
raise Exception("Electronics dataset file is empty.") | |
except Exception as e: | |
raise Exception(f"Error loading electronics dataset: {e}") | |
def generate_electronics_response(row): | |
"""Generate a response for an electronics query.""" | |
return { | |
"ProductName": row['ProductName'], | |
"Price": row['Price'], | |
"Description": row['Description'] | |
} | |
def extract_electronics_filters(query): | |
"""Extract filters from the query to refine search.""" | |
filters = {} | |
query_lower = query.lower() | |
if 'best' in query_lower and 'rating' in query_lower: | |
filters['Rating'] = 'max' | |
if 'laptop' in query_lower: | |
filters['Category'] = 'laptop' | |
elif 'smartphone' in query_lower: | |
filters['Category'] = 'smartphone' | |
return filters | |
def apply_electronics_filters(df, filters): | |
"""Apply filters to the dataset based on the query.""" | |
for key, value in filters.items(): | |
if key == 'Rating' and value == 'max': | |
df = df[df['Rating'] == df['Rating'].max()] | |
elif key in df.columns and isinstance(value, str): | |
df = df[df[key].str.contains(value, case=False, na=False)] | |
return df | |
def query_electronics(user_query, n_results=5): | |
"""Query the electronics dataset based on user input.""" | |
electronics_df = load_electronics_dataset() | |
filtered_df = apply_electronics_filters(electronics_df, extract_electronics_filters(user_query)) | |
if 'Rating' in filtered_df.columns: | |
sorted_df = filtered_df.sort_values(by='Rating', ascending=False) | |
else: | |
sorted_df = filtered_df | |
results = sorted_df.head(n_results) | |
return [generate_electronics_response(row) for _, row in results.iterrows()] | |
def load_fashion_dataset(): | |
"""Load the fashion dataset.""" | |
try: | |
return pd.read_csv('fashion.csv') | |
except FileNotFoundError: | |
raise Exception("Fashion dataset file not found.") | |
except pd.errors.EmptyDataError: | |
raise Exception("Fashion dataset file is empty.") | |
except Exception as e: | |
raise Exception(f"Error loading fashion dataset: {e}") | |
def generate_fashion_response(row): | |
"""Generate a response for a fashion query.""" | |
return { | |
"ProductName": row['ProductName'], | |
"Price": row['Price'], | |
"Description": row['Description'] | |
} | |
def extract_fashion_filters(query): | |
"""Extract filters from the query to refine search.""" | |
filters = {} | |
query_lower = query.lower() | |
if 'best' in query_lower and 'rating' in query_lower: | |
filters['Rating'] = 'max' | |
if 'dresses' in query_lower: | |
filters['Category'] = 'dress' | |
elif 'shoes' in query_lower: | |
filters['Category'] = 'shoe' | |
return filters | |
def apply_fashion_filters(df, filters): | |
"""Apply filters to the dataset based on the query.""" | |
for key, value in filters.items(): | |
if key == 'Rating' and value == 'max': | |
df = df[df['Rating'] == df['Rating'].max()] | |
elif key in df.columns and isinstance(value, str): | |
df = df[df[key].str.contains(value, case=False, na=False)] | |
return df | |
def query_fashion(user_query, n_results=5): | |
"""Query the fashion dataset based on user input.""" | |
fashion_df = load_fashion_dataset() | |
filtered_df = apply_fashion_filters(fashion_df, extract_fashion_filters(user_query)) | |
if 'Rating' in filtered_df.columns: | |
sorted_df = filtered_df.sort_values(by='Rating', ascending=False) | |
else: | |
sorted_df = filtered_df | |
results = sorted_df.head(n_results) | |
return [generate_fashion_response(row) for _, row in results.iterrows()] | |
def customer_service_agent_response(user_query): | |
"""Generate a response from the customer service agent based on the query.""" | |
response = "" | |
data = None | |
if "fashion" in user_query.lower(): | |
system_message = """ | |
You are a friendly and knowledgeable fashion consultant. Your goal is to assist customers in finding the perfect fashion items and provide exceptional service. | |
Use conversational language, ask clarifying questions, and build rapport with the customer. | |
Avoid technical jargon and focus on understanding the customer's style and preferences. Always be helpful, enthusiastic, and eager to assist. | |
""" | |
# Fetch data and generate response for fashion queries | |
data = query_fashion(user_query) | |
if data: | |
response = "Here are some fashion items you might be interested in:\n\n" | |
for item in data: | |
response += (f"**Product Name**: {item['ProductName']}\n" | |
f"**Price**: ${item['Price']}\n" | |
f"**Description**: {item['Description']}\n\n") | |
else: | |
response = "Sorry, I couldn't find any fashion items matching your query.\n\n" | |
elif "electronics" in user_query.lower(): | |
system_message = """ | |
You are a friendly and knowledgeable electronics sales agent. Your goal is to assist customers in finding the perfect electronics product and provide exceptional service. | |
Use conversational language, ask clarifying questions, and build rapport with the customer. | |
Avoid technical jargon and focus on understanding the customer's needs. Always be helpful, enthusiastic, and eager to assist. | |
""" | |
# Fetch data and generate response for electronics queries | |
data = query_electronics(user_query) | |
if data: | |
response = "Here are some electronics items you might be interested in:\n\n" | |
for item in data: | |
response += (f"**Product Name**: {item['ProductName']}\n" | |
f"**Price**: ${item['Price']}\n" | |
f"**Description**: {item['Description']}\n\n") | |
else: | |
response = "Sorry, I couldn't find any electronics items matching your query.\n\n" | |
# Use OpenAI API to generate a response for general or fallback queries. | |
if not response: | |
response = generate_openai_response(user_query) | |
return response | |
def main(): | |
"""Main function to run the Streamlit app.""" | |
st.title("Customer Service Chatbot") | |
user_query = st.text_input("Ask your question:") | |
if user_query: | |
# Generate a response from the customer service agent | |
response = customer_service_agent_response(user_query) | |
# Display the response | |
st.write(response) | |
if __name__ == "__main__": | |
main() | |