Yongkang ZOU
update agent
dfe572b
raw
history blame
7.48 kB
import os
from dotenv import load_dotenv
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_groq import ChatGroq
from supabase import create_client
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import SupabaseVectorStore
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
import json
load_dotenv()
# ------------------- TOOL DEFINITIONS -------------------
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract b from a."""
return a - b
@tool
def divide(a: int, b: int) -> float:
"""Divide a by b. Raise error if b is zero."""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get remainder of a divided by b."""
return a % b
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query (max 2 results)."""
docs = WikipediaLoader(query=query, load_max_docs=2).load()
return "\n\n".join([doc.page_content for doc in docs])
@tool
def web_search(query: str) -> str:
"""Search the web using Tavily (max 3 results)."""
results = TavilySearchResults(max_results=3).invoke(query)
texts = [doc.get("content", "") or doc.get("text", "") for doc in results if isinstance(doc, dict)]
return "\n\n".join(texts)
@tool
def arvix_search(query: str) -> str:
"""Search Arxiv for academic papers (max 3 results, truncated to 1000 characters each)."""
docs = ArxivLoader(query=query, load_max_docs=3).load()
return "\n\n".join([doc.page_content[:1000] for doc in docs])
@tool
def read_excel_file(path: str) -> str:
"""Read an Excel file and return the first few rows of each sheet as text."""
import pandas as pd
try:
xls = pd.ExcelFile(path)
content = ""
for sheet in xls.sheet_names:
df = xls.parse(sheet)
content += f"Sheet: {sheet}\n"
content += df.head(5).to_string(index=False) + "\n\n"
return content.strip()
except Exception as e:
return f"Error reading Excel file: {str(e)}"
tools = [multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search, read_excel_file]
# ------------------- SYSTEM PROMPT -------------------
system_prompt_path = "system_prompt.txt"
if os.path.exists(system_prompt_path):
with open(system_prompt_path, "r", encoding="utf-8") as f:
system_prompt = f.read()
else:
system_prompt = (
"You are an intelligent AI agent who can solve math, science, factual, and research-based problems. "
"You can use tools like Wikipedia, Web search, or Arxiv when needed. Always give precise and helpful answers."
)
sys_msg = SystemMessage(content=system_prompt)
# ------------------- GRAPH CONSTRUCTION -------------------
def build_graph(provider: str = "groq"):
if provider == "google":
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
elif provider == "groq":
groq_key = os.getenv("GROQ_API_KEY")
if not groq_key:
raise ValueError("GROQ_API_KEY is not set.")
llm = ChatGroq(model="qwen-qwq-32b", temperature=0, api_key=groq_key)
elif provider == "huggingface":
llm = ChatHuggingFace(
llm=HuggingFaceEndpoint(
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf",
temperature=0
)
)
elif provider == "openai":
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
raise ValueError("OPENAI_API_KEY is not set.")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_key)
else:
raise ValueError("Invalid provider")
llm_with_tools = llm.bind_tools(tools)
def assistant(state: MessagesState):
return {"messages": [sys_msg] + [llm_with_tools.invoke(state["messages"])]}
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
vectorstore = SupabaseVectorStore(
client=supabase,
embedding=embedding_model,
table_name="QA_db"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
# ✅ 替换 similarity_search_by_vector_with_relevance_scores 方法,直接调用 supabase.rpc
original_fn = vectorstore.similarity_search_by_vector_with_relevance_scores
# ✅ 覆盖 vectorstore 的方法
def patched_fn(embedding, k=4, filter=None, **kwargs):
response = supabase.rpc(
"match_documents",
{
"query_embedding": embedding,
"match_count": k
}
).execute()
documents = []
for r in response.data:
metadata = r["metadata"]
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
doc = Document(
page_content=r["content"],
metadata=metadata
)
documents.append((doc, r["similarity"]))
return documents
# ✅ 覆盖 vectorstore 的方法
vectorstore.similarity_search_by_vector_with_relevance_scores = patched_fn
def qa_retriever_node(state: MessagesState):
user_question = state["messages"][-1].content
docs = retriever.invoke(user_question)
if docs:
return {
"messages": state["messages"] + [AIMessage(content=docs[0].page_content)],
"__condition__": "complete"
}
return {"messages": state["messages"], "__condition__": "default"}
builder = StateGraph(MessagesState)
builder.add_node("retriever", qa_retriever_node)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "retriever")
builder.add_conditional_edges("retriever", {
"default": lambda x: "assistant",
"complete": lambda x: END,
})
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
return builder.compile()
# ------------------- LOCAL TEST -------------------
if __name__ == "__main__":
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?"
graph = build_graph(provider="openai")
messages = graph.invoke({"messages": [HumanMessage(content=question)]})
print("=== AI Agent Response ===")
for m in messages["messages"]:
m.pretty_print()