"""Main entrypoint for the conversational APM graph. This module defines the core structure and functionality of the conversational APM graph. It includes the main graph definition, state management, and key functions for processing & routing user queries, generating answer to Enterprise Architecture related user questions about an IT Landscape or Websearch. """ #CHANGELOG: 2025-06-08 # Refactored to use tools.websearch (changes State, removed web_search) import os from langgraph.graph import END, StateGraph #core libraries from langchain_core.runnables import RunnableConfig from langchain_core.prompts.chat import ChatPromptTemplate from langchain_core.prompts import PromptTemplate, FewShotChatMessagePromptTemplate from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers.json import JsonOutputParser from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableLambda from langchain_core.runnables import RunnablePassthrough, RunnableConfig from langchain_core.runnables import RunnableGenerator from langchain_core.documents import Document from langchain.load import dumps, loads from langchain.hub import pull from operator import itemgetter from typing import AsyncGenerator, AsyncIterator #compute amount of tokens used import tiktoken #import APMGraph packages from ea4all.src.ea4all_apm.configuration import AgentConfiguration from ea4all.src.ea4all_apm.state import InputState, OutputState, OverallState import ea4all.src.ea4all_apm.prompts as e4p from ea4all.src.shared.utils import ( load_mock_content, get_llm_client, get_history_gradio, extract_structured_output, extract_topic_from_business_input, _join_paths, ) from ea4all.src.shared import vectorstore from ea4all.src.tools.tools import ( websearch, ) # This file contains sample APM QUESTIONS APM_MOCK_QNA = "apm_qna_mock.txt" async def retrieve_documents( state: OverallState, *, config: RunnableConfig ) -> dict[str, list[Document]]: """Retrieve documents based on a given query. This function uses a retriever to fetch relevant documents for a given query. Args: state (QueryState): The current state containing the query string. config (RunnableConfig): Configuration with the retriever used to fetch documents. Returns: dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents. """ with vectorstore.make_retriever(config) as retriever: response = await retriever.ainvoke(state.question, config) return {"messages": response} async def apm_retriever(config: RunnableConfig): with vectorstore.make_retriever(config) as retriever: response = retriever return response # Few Shot Examples few_shot_step_back_examples = [ { "input": "Who can I talk to about innovation?", "output": '{"datasource": "vectorstore, "topic":"who can I talk to"}"}', }, { "input": "Describe the finance landscape.", "output": '{"datasource": "vectorstore", "topic:":"line of business landscape"}', }, { "input": "What applications support the marketing landscape?", "output": '{"datasource": "vectorstore", "topic:":"line of business landscape"}', }, { "input": "List the simplification opportunities for the collaboration space.", "output": '{"datasource": "vectorstore", "topic:":"line of business landscape"}', }, { "input": "What are the available patterns to deploy AI applications into AWS?", "output": '{"datasource": "websearch", "topic:":"design patterns"}', }, { "input": "What is a Well-Architected Framework?", "output": '{"datasource": "websearch", "topic:":"architecture framework"}', }, { "input": "What is a Cloud Assessment Framework?", "output": '{"datasource": "websearch", "topic:":"cloud assessment framework"}', }, { "input": "What are the main architecture frameworks?", "output": '{"datasource": "websearch", "topic:":"architecture framework"}', }, ] # We now transform these to example messages few_shot_step_back_examples_prompt = ChatPromptTemplate.from_messages( [ ("human", "{input}"), ("ai", "{output}"), ] ) few_shot_prompt = FewShotChatMessagePromptTemplate( input_variables=["user_question"], example_prompt=few_shot_step_back_examples_prompt, examples=few_shot_step_back_examples, ) ## RAG from scratch: Query Translations functions def get_unique_union(documents: list[list]): """ Unique union of retrieved docs """ # Flatten list of lists, and convert each Document to string flattened_docs = [dumps(doc) for sublist in documents for doc in sublist] # Get unique documents unique_docs = list(set(flattened_docs)) # Return return [loads(doc) for doc in unique_docs] def reciprocal_rank_fusion(results: list[list], k=60): """ Reciprocal_rank_fusion that takes multiple lists of ranked documents and an optional parameter k used in the RRF formula """ # Initialize a dictionary to hold fused scores for each unique document fused_scores = {} # Iterate through each list of ranked documents for docs in results: # Iterate through each document in the list, with its rank (position in the list) for rank, doc in enumerate(docs): # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON) doc_str = doc.metadata['source'] # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0 if doc_str not in fused_scores: fused_scores[doc_str] = [doc,0] # Retrieve the current score of the document, if any #previous_score = fused_scores[doc_str] # Update the score of the document using the RRF formula: 1 / (rank + k) fused_scores[doc_str][1] += 1 / (rank + k) # Sort the documents based on their fused scores in descending order to get the final reranked results reranked_results = [ doc[0] for source, doc in sorted(fused_scores.items(), key=lambda x: x[0], reverse=True) ] # Return the reranked results as a list of tuples, each containing the document and its fused score return reranked_results def format_qa_pair(question, answer): """Format Q and A pair""" formatted_string = "" formatted_string += f"Question: {question}\nAnswer: {answer}\n\n" return formatted_string.strip() async def get_retrieval_chain(rag_input, ea4all_user, question, retriever, config: RunnableConfig): configuration = AgentConfiguration.from_runnable_config(config) llm = get_llm_client(configuration.query_model, api_base_url=configuration.api_base_url) #retriever = retriever_faiss(db, ea4all_user) #CHANGE: Receive as parameter originer #retriever = await apm_retriever(config) #NEEDS retrofit to add user_login if rag_input == 1: # Multi-query ## RAG Query Transformation: Multi query prompt_perspectives = ChatPromptTemplate.from_template(e4p.multiquery_template) generate_queries = ( prompt_perspectives | llm | StrOutputParser() | (lambda x: x.split("\n")) ) # Retrieve chain retrieval_chain = generate_queries | retriever.map() | get_unique_union elif rag_input == 2: # RAG Fusion # Prompt prompt_rag_fusion = ChatPromptTemplate.from_template(e4p.rag_fusion_questions_template) generate_queries = ( prompt_rag_fusion | llm | StrOutputParser() | (lambda x: x.split("\n")) ) # Retrieval chain retrieval_chain = generate_queries | retriever.map() | reciprocal_rank_fusion elif rag_input == 3: # Decomposition # Build prompt prompt_decomposition = ChatPromptTemplate.from_template(e4p.decomposition_template) # Chain generate_queries_decomposition = ( prompt_decomposition | llm | StrOutputParser() | (lambda x: x.split("\n"))) # Return new set of questions questions = generate_queries_decomposition.invoke( {"question": question}, {"tags": [os.environ['EA4ALL_ENV']], "metadata": {"ea4all_user": ea4all_user, "rag_input": rag_input}} ) # Prompt: Answer recuservely decomposition_prompt = ChatPromptTemplate.from_template(e4p.decomposition_answer_recursevely_template) # Answer each question and return final answer answer = "" q_a_pairs = "" for q in questions: rag_chain = ( {"context": itemgetter("question") | retriever, "question": itemgetter("question"), "q_a_pairs": itemgetter("q_a_pairs")} | decomposition_prompt | llm | StrOutputParser()) answer = rag_chain.invoke( {"question":q,"q_a_pairs":q_a_pairs}, {"tags": [os.environ['EA4ALL_ENV']], "metadata": {"ea4all_user": ea4all_user, "rag_input": rag_input}} ) q_a_pair = format_qa_pair(q,answer) q_a_pairs = q_a_pairs + "\n---\n" + q_a_pair return answer # Final response to user inquiry elif rag_input == 4: # RAG Step-back generate_queries_step_back = e4p.few_shot_step_back_prompt | llm | StrOutputParser() generate_queries_step_back.invoke( {"standalone_question": lambda x: x["standalone_question"]}, {"tags": [os.environ['EA4ALL_ENV']], "metadata": {"ea4all_user": ea4all_user, "rag_input": rag_input}} ) response_prompt = ChatPromptTemplate.from_template(e4p.step_back_response_prompt_template) retrieval_chain = ( { # Retrieve context using the normal question "normal_context": RunnableLambda(lambda x: getattr(x, "standalone_question")) | retriever, # Retrieve context using the step-back question "step_back_context": generate_queries_step_back | retriever, # Pass on the question "standalone_question": lambda x: x["standalone_question"], } | response_prompt | llm | StrOutputParser() ) elif rag_input == 5: # RAG HyDE # Prompt prompt_hyde = ChatPromptTemplate.from_template(e4p.hyde_template) generate_docs_for_retrieval = ( prompt_hyde | llm | StrOutputParser() ) retrieval_chain = generate_docs_for_retrieval | retriever else: # Standard RAG approach - user query retrieval_chain = itemgetter("standalone_question") | retriever return retrieval_chain #Get relevant asnwers to user query ##get_relevant_documents "deprecated" - replaced by invoke : 2024-06-07 async def get_relevant_answers(state: OverallState, query, config: RunnableConfig): if query != "": #retriever.vectorstore.index.ntotal #retriever = retriever_faiss(user_ip) #response = retriever.invoke({"standalone_question": query}) response = await retrieve_documents(state, config=config) return response else: return [] #Return LLM answer to user inquriy def rag_llm(llm, chat_prompt, query, response): answers = llm.invoke( chat_prompt.format_prompt( cdocs=response, query=query, ) ) try: return answers.content except AttributeError: return answers #Save user apm to disk def ea4all_serialize(apm_file, user_ip): import pickle # Specify the target filename filename = _join_paths(AgentConfiguration.ea4all_store, f"apm_{user_ip}.pkl") # Serialize and save the binary data to a file try: with open(filename, 'wb') as file: pickle.dump(apm_file, file) return True # Some code that might raise an exception except Exception: # Handle the exception return False #number of tokens consumed def num_tokens_from_string(string: str, encoding_name: str) -> int: """Returns the number of tokens in a text string.""" encoding = tiktoken.get_encoding(encoding_name) num_tokens = len(encoding.encode(string)) return num_tokens #retrieve relevant questions based on user interaction def get_relevant_questions(): relevant_questions = [] mock = load_mock_content(APM_MOCK_QNA) for line in mock.splitlines(): relevant_questions += [line] return relevant_questions #Rephrase the original user question based on system prompt to lead a better LLM answer def user_query_rephrasing( state: OverallState, _prompt=None, *, config: RunnableConfig ) -> dict[str,str]: question = getattr(state,'question') configuration = AgentConfiguration.from_runnable_config(config) # 'model = load_chat_model(configuration.query_model) model = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) if _prompt: rewrite_prompt = pull("learn-it-all-do-it-all/ea4all_user_question_rephrase") inputs = {"user_question": question} #, "ai_output": e4p.LLAMA31_PROMPT_FORMAT} else: rewrite_prompt = pull("learn-it-all-do-it-all/ea4all_question_rewriter") inputs = {"user_question": question, "target":"web search"} rewrite_chain = rewrite_prompt | model | JsonOutputParser() result = rewrite_chain.invoke( input=inputs ) try: question = result['rephrased'] except Exception: question = state.question return {"question": question} # Post-processing def format_docs(docs): return "\n".join(doc.page_content for doc in docs) def identify_task_category( question,chat_memory,config: RunnableConfig ): configuration = AgentConfiguration.from_runnable_config(config) prompt = pull("learn-it-all-do-it-all/apm_task_router") llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) try: # x=get_history_gradio(x) extract Human / AI # fake gradio chat memory x={"chat_memory":[]} x['chat_memory'] = chat_memory # extract human message only memory="" for human, ai in x['chat_memory']: memory += human + ";" chain_one = prompt | llm | JsonOutputParser() result = chain_one.invoke({"user_question": memory + question if x else question}) #parse response and pass on to next chain2/prompt2 response = extract_topic_from_business_input(result) return response except Exception: return {'primary': 'General Inquiry'} def retrieval_grader(model): prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n Provide the binary score as a JSON with a single key 'score' and no premable or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the retrieved document: \n\n {document} \n\n Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|> """, input_variables=["user_question", "document"], ) retrieval_grader = prompt | model | JsonOutputParser() return retrieval_grader def hallucination_grader(model): # Prompt prompt = pull("learn-it-all-do-it-all/ea4all_apm_hallucination_grader") hallucination_grader = prompt | model | JsonOutputParser() return hallucination_grader def grade_answer(model): # Prompt prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assistant and your task is to assess the answer relevance to address a user question.\n Give a binary score 'yes' to indicate that the answer is relevant or 'no' otherwise.\n Provide the binary score as a JSON with a keys 'score' and nothing else.\n <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer: \n ------- \n {generation} \n ------- \n Here is the question: {user_question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["generation", "user_question"], ) answer_grader = prompt | model | JsonOutputParser() return answer_grader async def grade_documents(state, config: RunnableConfig): """ Determines whether the retrieved documents are relevant to the question If any document is not relevant, we will set a flag to run web search Args: state (dict): The current graph state Returns: state (dict): Filtered out irrelevant documents and updated web_search state """ configuration = AgentConfiguration.from_runnable_config(config) print("---CHECK DOCUMENT RELEVANCE TO QUESTION---") question = state.question documents = state.messages source = state.source llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) # Score each doc filtered_docs = [] for d in documents: score = retrieval_grader(llm).ainvoke( {"user_question": question, "document": d.page_content} ) grade = getattr(score,"score", "no") # Document relevant if grade.lower() == "yes": print("---GRADE: DOCUMENT RELEVANT---") filtered_docs.append(d) # Document not relevant else: print("---GRADE: DOCUMENT NOT RELEVANT---") # We do not include the document in filtered_docs # We set a flag to indicate that we want to run web search #web_search = "Yes" source = "websearch" return {"documents": filtered_docs, "question": question, "source": source} def decide_to_generate(state): """ Determines whether to generate an answer, or add web search Args: state (dict): The current graph state Returns: str: Binary decision for next node to call """ print("---ASSESS GRADED DOCUMENTS---") state.question source = state.source getattr(state,'documents') if source == "websearch": # All documents have been filtered check_relevance # We will re-generate a new query print( "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---" ) return "websearch" else: # We have relevant documents, so generate answer print("---DECISION: GENERATE---") return "generate" def grade_generation_v_documents_and_question( state:OverallState, config: RunnableConfig) -> str: """ Determines whether the generation is grounded in the document and answers question. Args: state (dict): The current graph state Returns: str: Decision for next node to call """ configuration = AgentConfiguration.from_runnable_config(config) question = getattr(state,'question') documents = getattr(state,'messages') generation = getattr(state,'generation') llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) if getattr(state,'source') == "websearch": #print("---CHECK HALLUCINATIONS---") hallucination_grader_instance = hallucination_grader(llm) #for output in hallucination_grader_instance.stream( output = hallucination_grader_instance.invoke( {"documents": documents, "generation": generation}, config={"tags":["stream_hallucination"]}) #yield(output) grade = output["score"] print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---") if grade=="yes" else exit else: grade = 'yes' # Check hallucination if grade == "yes": #Check question-answering print("---GRADE GENERATION vs QUESTION---") grade_answer_instance = grade_answer(llm) #for output in grade_answer_instance.stream( output = grade_answer_instance.invoke( {"user_question": question, "generation": generation}, config={"tags":["stream_grade_answer"]}) #yield(output) grade = output["score"] if grade == "yes": print("---DECISION: GENERATION ADDRESSES QUESTION---") return "useful" else: print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---") return "not useful" else: print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---") return "not supported" async def apm_query_router( state: OverallState, config: RunnableConfig ) -> str: configuration = AgentConfiguration.from_runnable_config(config) routing_prompt = pull('learn-it-all-do-it-all/ea4all-apm-user-question-routing') #update prompt with few-shot-examples updated_prompt = routing_prompt.from_messages([routing_prompt.messages[0], few_shot_prompt, routing_prompt.messages[1], routing_prompt.messages[2]]) # Apply partial variables to the created template updated_prompt = updated_prompt.partial( metadata=e4p.TEMPLATE_APM_QNA_ROUTING, ) model = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) route = updated_prompt | model ##Rephrase user question to lead bettern LLM response #PROMPT as context NOT WORKING AS EXPECTED 2024-09-23 user_query = user_query_rephrasing(state=state, _prompt=updated_prompt, config=config)['question'] response = await route.ainvoke({"user_question": user_query}) extracted = extract_structured_output(response.content) if extracted is not None: datasource = extracted.get('datasource', 'vectorstore') else: datasource = 'vectorstore' return datasource async def retrieve( state: OverallState, config: RunnableConfig ): """ Retrieve documents Args: state (dict): The current graph state Returns: state (dict): New key added to state, documents, that contains retrieved documents """ configuration = AgentConfiguration.from_runnable_config(config) #print("---RETRIEVE---") question = getattr(state,'question') llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) with vectorstore.make_retriever(config) as _retriever: retriever = _retriever # First we add a step to load memory from gr.ChatInterface.history_chat # This adds a "memory" key to the input object loaded_memory = RunnablePassthrough.assign( chat_history = RunnableLambda(get_history_gradio) | itemgetter("history")) # Now we calculate the standalone question <= Original Question + ChatHistory standalone_question = { "standalone_question": { "chat_history": lambda x: str(x["chat_history"]), "user_question": lambda x: x['user_question'] } | e4p.CONDENSE_QUESTION_PROMPT | llm | StrOutputParser() } # Retrieval rag_input = int(getattr(state,'rag')) retrieval_chain = await get_retrieval_chain(rag_input,"ea4all_agent",question,retriever, config=config) retrieved_documents = { "cdocs": retrieval_chain, "user_question": itemgetter("standalone_question") } # And now we put it all together! final_chain = loaded_memory | standalone_question | retrieved_documents documents = await final_chain.ainvoke({"user_question": question, "chat_memory":[]}) return {"messages": format_docs(documents['cdocs']), "question": question, "rag":getattr(state,'rag')} ### Edges ### def route_to_node(state:OverallState): if state.source == "websearch": #print("---ROUTE QUESTION TO WEB SEARCH---") return "websearch" elif state.source == "vectorstore": #print("---ROUTE QUESTION TO RAG---") return "vectorstore" async def route_question( state: OverallState, config: RunnableConfig ) -> dict[str, str]: """ Route question to web search or RAG. Args: state (dict): The current graph state Returns: str: Next node to call """ #print("---ROUTE QUESTION---") source = await apm_query_router(state, config) return {"source":source} async def stream_generation( state: OverallState, config: RunnableConfig ) -> AsyncGenerator[str, None]: configuration = AgentConfiguration.from_runnable_config(config) llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url,streaming=configuration.streaming) documents = None question = None source = None chat_memory = None async for s in state: documents = getattr(s,"messages") question = getattr(s,"question") source = getattr(s,"source") chat_memory = getattr(s,"chat_memory") # Prompt Web Search generation if source == "websearch": prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an enterprise architect assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|> Question: {user_question} Context: {cdocs} Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["user_question", "cdocs"], ) else: # Now we construct the inputs for the final prompt # identify primary, second category tc = identify_task_category(question,chat_memory,config) prompt = e4p.ea4ll_prompt_selector(tc['primary']) rag_chain = prompt | llm | StrOutputParser() async for output in rag_chain.astream({"cdocs": documents, "user_question": question}): yield(output) async def generate( state: OverallState, config: RunnableConfig ) -> dict[str, str]: """ Generate answer Args: state (dict): The current graph state config (RunnableConfig): Configuration with the model used for query analysis. Returns: state (dict): New key added to state, generation, that contains LLM generation """ #print("---GENERATE---") #documents = getattr(state,'messages')[-1].content #documents source = getattr(state,'source') #question = getattr(state,'question') ##Triggered by hallucination_grade? 2025-02-21 - NOT USER being edged to END atm #2025-02-21: it's being triggered by super_graph supervisor as well - need to review as calling web_search twice #if getattr(state,'generation') is None: # if getattr(state,'web_search') == "Yes": # await websearch(state, config) # else: # state.rag = "1" # await retrieve(state, config) # Generate answer tags = ["websearch_stream"] if source == "websearch" else ["apm_stream"] gen = RunnableGenerator(stream_generation).with_config(tags=tags) generation="" async for message in gen.astream(state): generation = ''.join([generation,message]) #return {"messages": documents.content, "question": question, "generation": generation, "web_search": web_search} return {"generation": generation} #ea4all-qna-agent-conversational-with-memory async def apm_agentic_qna( state:OverallState, config: RunnableConfig): configuration = AgentConfiguration.from_runnable_config(config) question = getattr(state,'question') chat_memory = getattr(state,'chat_memory') llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) retriever = await apm_retriever(config) # First we add a step to load memory from gr.ChatInterface.history_chat # This adds a "memory" key to the input object loaded_memory = RunnablePassthrough.assign( chat_history = itemgetter("chat_memory")) # Now we calculate the standalone question <= Original Question + ChatHistory standalone_question = { "standalone_question": { "chat_history": lambda x: str(x["chat_history"]), "user_question": lambda x: x["user_question"] } | e4p.CONDENSE_QUESTION_PROMPT | llm | StrOutputParser() } # Start with Hyde prompt_hyde = ChatPromptTemplate.from_template(e4p.hyde_template) generate_docs_for_retrieval = ( prompt_hyde | llm | StrOutputParser() ) retrieval_chain = generate_docs_for_retrieval | retriever retrieved_documents = { "cdocs": retrieval_chain, "query": itemgetter("standalone_question") } # And now we put it all together! final_chain = loaded_memory | standalone_question | retrieved_documents documents = await final_chain.ainvoke({"user_question": question, "chat_memory":chat_memory}) #return {"documents": format_docs(documents['cdocs']), "question": question, "rag":5, "generation": None} return {"messages": format_docs(documents['cdocs']), "rag":5} async def final(state: OverallState): return {"safety_status": state} async def choose_next(state: OverallState): if state.safety_status is not None and len(state.safety_status) > 0 and state.safety_status[0] == 'no': return "exit" else: return "route" class SafetyCheck: def apm_safety_check(self,state: OverallState, config: RunnableConfig): configuration = AgentConfiguration.from_runnable_config(config) question = state.question safety_prompt = pull('learn-it-all-do-it-all/ea4all_apm_safety_check') llm = get_llm_client(model=configuration.query_model, api_base_url=configuration.api_base_url) route = safety_prompt | llm | JsonOutputParser() response = route.invoke({"user_question": question}) try: score = response['score'] explain = response['response'] except ValueError: score = 'no' explain = 'I cannot answer your question at moment!' return {"safety_status": [score, explain, question]} def __init__(self): self._safety_run = self.apm_safety_check def __call__(self, state: OverallState, config: RunnableConfig) -> dict[str, list]: try: response = getattr(self, '_safety_run')(state, config) return {"safety_status": [response['safety_status'][0], "", state.question]} except Exception as e: return {"safety_status": ['no', e, state.question]} ##BUILD APM Graph # Build graph workflow = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=AgentConfiguration) # Define the nodes workflow.add_node("safety_check",SafetyCheck()) workflow.add_node("route_question", route_question) # route to vectorstore or websearch workflow.add_node("retrieve", apm_agentic_qna) # retrieve workflow.add_node("websearch", websearch) # web search workflow.add_node("generate", generate) # generate web search based answer workflow.add_node("final", final) workflow.set_entry_point("safety_check") workflow.add_conditional_edges( "safety_check", choose_next, { "exit": "final", "route": "route_question" } ) workflow.add_conditional_edges( "route_question", route_to_node, { "websearch": "websearch", "vectorstore": "retrieve", }, ) workflow.add_edge("retrieve", "generate") workflow.add_edge("websearch", "generate") workflow.add_conditional_edges( #2025-02-27: Conditional edges expect sync function only "generate", grade_generation_v_documents_and_question, { "not supported": "route_question", "useful": END, "not useful": END, ##2025-02-21: need to review THIS to try again and respond to user with a better answer }, ) workflow.add_edge("final", END) # Compile apm_graph = workflow.compile() apm_graph.name = "APMGraph"