from query_preprocessing import preprocess_query import dspy from typing import List, Dict import os OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: raise RuntimeError("Missing OPENAI_API_KEY env var") gpt_4o_mini = dspy.LM('openai/gpt-4o-mini', api_key=OPENAI_API_KEY) dspy.configure(lm=gpt_4o_mini) # == Building Blocks == class DSPyHybridRetriever(dspy.Module): def __init__(self, retriever): super().__init__() self.retriever = retriever def forward(self, query: str, municipality: str = "", top_k: int = 5): results = self.retriever.rerank(query, top_k=top_k, municipality_filter=municipality) return {"retrieved_chunks": results} class RetrieveChunks(dspy.Signature): """Given a user query and optional municipality, retrieve relevant text chunks.""" query = dspy.InputField(desc="User's question") municipality = dspy.InputField(desc="Optional municipality filter") retrieved_chunks = dspy.OutputField( desc=( "List of retrieved chunks, each as a dict with keys: " "`chunk`, `document_id`, `section`, `level`, `page`, " "`dense_score`, `sparse_score`, `graph_score`, `final_score`" ), type=List[Dict] # each item is a dict carrying all those fields ) class AnswerWithEvidence(dspy.Signature): """Answer the query using reasoning and retrieved chunks as context.""" query = dspy.InputField(desc="Rewritten question") retrieved_chunks = dspy.InputField(desc="Retrieved text chunks (List[dict])") answer = dspy.OutputField(desc="Final answer") rationale = dspy.OutputField(desc="Chain-of-thought reasoning") # == RAG Pipeline == class RAGChain(dspy.Module): def __init__(self, retriever, answerer): super().__init__() self.retriever = retriever self.answerer = answerer def forward(self, raw_query: str, municipality: str = "", feedback: str = ""): pre = preprocess_query(raw_query, feedback) rewritten = pre["rewritten_query"] or raw_query extracted_muni = pre["municipality"] or "" intent = pre["intent"] muni = municipality if municipality.strip() else extracted_muni retrieved = self.retriever(query=rewritten, municipality=muni) chunks = retrieved["retrieved_chunks"] # Answer + CoT using the rewritten query answer_result = self.answerer( query=rewritten, retrieved_chunks=[c["chunk_text"] for c in chunks] ) # Return everything for transparency & downstream use return { "original_query": raw_query, "intent": intent, "rewritten_query": rewritten, "llm_municipality": extracted_muni, "municipality": muni, "retrieved_chunks": chunks, "chain_of_thought": answer_result.rationale, "final_answer": answer_result.answer, }