|
import json
|
|
from typing_extensions import TypedDict
|
|
from models import extractor, router
|
|
from langgraph.graph import StateGraph, START,END
|
|
|
|
class PlanExecute(TypedDict):
|
|
ui_data:dict
|
|
query:str
|
|
previous_msgs:list
|
|
list_keys: list
|
|
information:list
|
|
|
|
async def route_step(state: PlanExecute):
|
|
response = await router.ainvoke({"query":state["query"],"previous_messages":state["previous_msgs"]})
|
|
return {"list_keys":response.route}
|
|
|
|
async def extract_step(state: PlanExecute):
|
|
key=state['list_keys'][0]
|
|
response = await extractor.ainvoke({"query":state["query"],"data":json.dumps(state['ui_data'][key]),"previous_messages":state["previous_msgs"]})
|
|
state['list_keys'].pop(0)
|
|
if response.information!='':
|
|
state['information'].append(response.information)
|
|
print(state['information'])
|
|
|
|
def should_end(state: PlanExecute):
|
|
if len(state["list_keys"])==0:
|
|
return END
|
|
else:
|
|
return "extract_step"
|
|
|
|
workflow = StateGraph(PlanExecute)
|
|
workflow.add_node("route_step", route_step)
|
|
workflow.add_node("extract_step", extract_step)
|
|
workflow.add_edge(START, "route_step")
|
|
workflow.add_conditional_edges("route_step",should_end,[END, "extract_step"])
|
|
workflow.add_conditional_edges("extract_step",should_end,[END, "extract_step"])
|
|
app_graph = workflow.compile()
|
|
|