from typing import TypedDict, Dict from langgraph.graph import StateGraph, END from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables.graph import MermaidDrawMethod from langchain_openai import ChatOpenAI import os from dotenv import load_dotenv from utils._admin_util import create_rag class State(TypedDict): query: str category: str sentiment: str response: str def check_api_key(): load_dotenv() """Verify that the API key is set and valid""" api_key = os.getenv("OPENAI_API_KEY") print("api_key", api_key) if not api_key: raise ValueError("OpenAI API key not found in environment variables") return api_key api_key = check_api_key() llm = ChatOpenAI( model="gpt-3.5-turbo", openai_api_key=api_key, temperature=0.7 ) def rag(state: State)->State: rag_chain = create_rag() # Extract just the query string from the state query = state["query"] print("query", query) response = rag_chain.invoke(query) # Pass the string directly, not a dict print("response", response) return {"response": response} def categorize(state: State) -> State: "HR, IT, Transportation" prompt = ChatPromptTemplate.from_template( "Categorize the following query into one of these categories: " "HR, IT, Transportation, Other. Query: {query}" ) chain = prompt | llm category = chain.invoke({"query": state["query"]}).content return {"category": category} def analyze_sentiment(state: State) -> State: prompt = ChatPromptTemplate.from_template( "Analyze the sentiment of the following customer query" "Response with either 'Position', 'Neutral' , or 'Negative'. Query: {query}" ) chain = prompt | llm sentiment = chain.invoke({"query": state["query"]}).content return {"sentiment": sentiment} def handle_hr(state: State)->State: prompt = ChatPromptTemplate.from_template( "Provide a HR support response to the following query : {query}" ) chain = prompt | llm response = chain.invoke({"query": state["query"]}).content return {"response": response} def handle_it(state: State)->State: prompt = ChatPromptTemplate.from_template( "Provide a IT support response to the following query : {query}" ) chain = prompt | llm response = chain.invoke({"query": state["query"]}).content return {"response": response} def handle_transportation(state: State)->State: prompt = ChatPromptTemplate.from_template( "Provide a transportation support response to the following query : {query}" ) chain = prompt | llm response = chain.invoke({"query": state["query"]}).content return {"response": response} def handle_general(state: State)->State: prompt = ChatPromptTemplate.from_template( "Provide a general support response to the following query : {query}" ) chain = prompt | llm response = chain.invoke({"query": state["query"]}).content return {"response": response} def escalate(state: State)->State: return {"response": "This query has been escalate to a human agent due to its negative sentiment"} def route_query(state: State)->State: if state["sentiment"] == "Negative": return "escalate" elif state["category"] == "HR": return "handle_hr" elif state["category"] == "IT": return "handle_it" elif state["category"] == "Transportation": return "handle_transportation" else: return "handle_general" def rout_to_agent(state: State)->State: if "i don't know" in state["response"].lower(): print(state["response"]) print("return analyze_sentiment") return "analyze_sentiment" else: return "END" def run_customer_support(query: str)->Dict[str, str]: workflow = StateGraph(State) workflow.add_node("categorize", categorize) workflow.add_node("rag", rag) workflow.add_node("analyze_sentiment", analyze_sentiment) workflow.add_node("handle_hr", handle_hr) workflow.add_node("handle_it", handle_it) workflow.add_node("handle_transportation", handle_transportation) workflow.add_node("escalate", escalate) workflow.add_edge("categorize", "rag") workflow.add_conditional_edges("rag", rout_to_agent, {"analyze_sentiment": "analyze_sentiment", "END": END}) workflow.add_conditional_edges( "analyze_sentiment", route_query, { "handle_hr" : "handle_hr", "handle_it" : "handle_it", "handle_transportation" : "handle_transportation", "escalate": "escalate" } ) workflow.add_edge("handle_hr", END) workflow.add_edge("handle_it", END) workflow.add_edge("handle_transportation", END) workflow.add_edge("escalate", END) workflow.set_entry_point("categorize") app = workflow.compile() results = app.invoke({"query": query}) return { "category": results.get('category', ''), # Returns empty string if key missing "sentiment": results.get('sentiment', ''), "response": results['response'] }