Midterm-streamlit / utils /_graph_util.py
deepali1021's picture
Update utils/_graph_util.py
187e57f verified
from typing import TypedDict, Dict
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.graph import MermaidDrawMethod
from langchain_openai import ChatOpenAI
import os
from dotenv import load_dotenv
from utils._admin_util import create_rag
class State(TypedDict):
query: str
category: str
sentiment: str
response: str
def check_api_key():
load_dotenv()
"""Verify that the API key is set and valid"""
api_key = os.getenv("OPENAI_API_KEY")
print("api_key", api_key)
if not api_key:
raise ValueError("OpenAI API key not found in environment variables")
return api_key
api_key = check_api_key()
llm = ChatOpenAI(
model="gpt-3.5-turbo",
openai_api_key=api_key,
temperature=0.7
)
def rag(state: State)->State:
rag_chain = create_rag()
# Extract just the query string from the state
query = state["query"]
print("query", query)
response = rag_chain.invoke(query) # Pass the string directly, not a dict
print("response", response)
return {"response": response}
def categorize(state: State) -> State:
"HR, IT, Transportation"
prompt = ChatPromptTemplate.from_template(
"Categorize the following query into one of these categories: "
"HR, IT, Transportation, Other. Query: {query}"
)
chain = prompt | llm
category = chain.invoke({"query": state["query"]}).content
return {"category": category}
def analyze_sentiment(state: State) -> State:
prompt = ChatPromptTemplate.from_template(
"Analyze the sentiment of the following customer query"
"Response with either 'Position', 'Neutral' , or 'Negative'. Query: {query}"
)
chain = prompt | llm
sentiment = chain.invoke({"query": state["query"]}).content
return {"sentiment": sentiment}
def handle_hr(state: State)->State:
prompt = ChatPromptTemplate.from_template(
"Provide a HR support response to the following query : {query}"
)
chain = prompt | llm
response = chain.invoke({"query": state["query"]}).content
return {"response": response}
def handle_it(state: State)->State:
prompt = ChatPromptTemplate.from_template(
"Provide a IT support response to the following query : {query}"
)
chain = prompt | llm
response = chain.invoke({"query": state["query"]}).content
return {"response": response}
def handle_transportation(state: State)->State:
prompt = ChatPromptTemplate.from_template(
"Provide a transportation support response to the following query : {query}"
)
chain = prompt | llm
response = chain.invoke({"query": state["query"]}).content
return {"response": response}
def handle_general(state: State)->State:
prompt = ChatPromptTemplate.from_template(
"Provide a general support response to the following query : {query}"
)
chain = prompt | llm
response = chain.invoke({"query": state["query"]}).content
return {"response": response}
def escalate(state: State)->State:
return {"response": "This query has been escalate to a human agent due to its negative sentiment"}
def route_query(state: State)->State:
if state["sentiment"] == "Negative":
return "escalate"
elif state["category"] == "HR":
return "handle_hr"
elif state["category"] == "IT":
return "handle_it"
elif state["category"] == "Transportation":
return "handle_transportation"
else:
return "handle_general"
def rout_to_agent(state: State)->State:
if "i don't know" in state["response"].lower():
print(state["response"])
print("return analyze_sentiment")
return "analyze_sentiment"
else:
return "END"
def run_customer_support(query: str)->Dict[str, str]:
workflow = StateGraph(State)
workflow.add_node("categorize", categorize)
workflow.add_node("rag", rag)
workflow.add_node("analyze_sentiment", analyze_sentiment)
workflow.add_node("handle_hr", handle_hr)
workflow.add_node("handle_it", handle_it)
workflow.add_node("handle_transportation", handle_transportation)
workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "rag")
workflow.add_conditional_edges("rag", rout_to_agent, {"analyze_sentiment": "analyze_sentiment", "END": END})
workflow.add_conditional_edges(
"analyze_sentiment",
route_query,
{
"handle_hr" : "handle_hr",
"handle_it" : "handle_it",
"handle_transportation" : "handle_transportation",
"escalate": "escalate"
}
)
workflow.add_edge("handle_hr", END)
workflow.add_edge("handle_it", END)
workflow.add_edge("handle_transportation", END)
workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
app = workflow.compile()
results = app.invoke({"query": query})
return {
"category": results.get('category', ''), # Returns empty string if key missing
"sentiment": results.get('sentiment', ''),
"response": results['response']
}