import os import gradio as gr import pandas as pd from typing import List, Dict, Any # --- LlamaIndex & LangChain Imports --- from llama_index.core import VectorStoreIndex, Document, Settings from llama_index.llms.openai import OpenAI as LlamaOpenAI from llama_index.embeddings.openai import OpenAIEmbedding from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent from langchain_openai import ChatOpenAI from langchain.agents.agent_types import AgentType class HybridExcelQuerySystem: def __init__(self, openai_api_key: str): os.environ["OPENAI_API_KEY"] = openai_api_key Settings.llm = LlamaOpenAI(model="gpt-4o") Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") self.agent_llm = ChatOpenAI(temperature=0, model="gpt-4o") self.dataframes: Dict[str, pd.DataFrame] = {} self.vector_stores: Dict[str, VectorStoreIndex] = {} self.logs = [] self.sheet_names = [] def _pivot_numerical_data(self, df: pd.DataFrame) -> pd.DataFrame: """ --- NEW, MORE ROBUST VERSION --- Intelligently reconstructs the 'Numerical Data' sheet into a clean, tidy format. """ # Find the row with month names (it's the first non-empty row) header_row_index = df.dropna(how='all').index[0] month_headers = df.iloc[header_row_index].dropna().tolist() # Find the first column with financial metrics metric_col_name = df.columns[0] df_metrics = df[[metric_col_name]].dropna() # Find the starting row and column for the actual data start_row = df_metrics.index[0] start_col = df.columns.get_loc(month_headers[0]) # Extract the core data, metrics, and headers data = df.iloc[start_row:, start_col:start_col+len(month_headers)] metrics = df.iloc[start_row:, 0] # Create a new, clean DataFrame clean_df = pd.DataFrame(data.values, index=metrics, columns=month_headers) # Transpose so months are rows clean_df = clean_df.T clean_df = clean_df.reset_index().rename(columns={'index': 'Month'}) # Clean column names clean_df.columns = clean_df.columns.str.strip().str.replace(r'[^a-zA-Z0-9_%]', '_', regex=True).str.replace('__', '_') # Convert all columns except 'Month' to numeric for col in clean_df.columns: if col != 'Month': if clean_df[col].dtype == 'object': clean_df[col] = clean_df[col].astype(str).str.replace('%', '', regex=False) clean_df[col] = pd.to_numeric(clean_df[col], errors='coerce') return clean_df def load_excel_file(self, file_path: str) -> str: self.logs.clear() try: xls = pd.ExcelFile(file_path) self.sheet_names = xls.sheet_names self.logs.append(f"✅ Found {len(self.sheet_names)} sheets: {', '.join(self.sheet_names)}") for sheet_name in self.sheet_names: df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) # Read without a header if sheet_name == "Numerical Data": agent_df = self._pivot_numerical_data(df.copy()) else: agent_df = self._clean_dataframe_for_agent(df.copy()) self.dataframes[sheet_name] = agent_df rag_df = self._clean_dataframe_for_rag(df.copy()) markdown_text = rag_df.to_markdown(index=False) doc = Document(text=markdown_text, metadata={"source": sheet_name}) self.vector_stores[sheet_name] = VectorStoreIndex.from_documents([doc]) self.logs.append(f" - Prepared sheet '{sheet_name}' for both Lookup and Calculation.") self.logs.append("✅ All sheets are ready.") return "\n".join(self.logs) except Exception as e: raise Exception(f"Error loading Excel file: {e}") def _clean_dataframe_for_agent(self, df: pd.DataFrame) -> pd.DataFrame: df.columns = [f"Col_{i}" for i in range(len(df.columns))] return df def _clean_dataframe_for_rag(self, df: pd.DataFrame) -> pd.DataFrame: for col in df.columns: df[col] = df[col].astype(str) return df def _classify_query(self, query: str) -> str: prompt = f"""Classify the user's query about an Excel sheet as either "lookup" or "calculation". "lookup": for questions asking for specific data or summaries. "calculation": for questions requiring math, sorting, or filtering. Query: "{query}" Classification:""" response = self.agent_llm.invoke(prompt) classification = response.content.strip().lower() return "calculation" if "calculation" in classification else "lookup" def query(self, query: str, selected_sheet: str) -> Dict[str, Any]: if not selected_sheet: return {"answer": "Error: Please select a sheet first.", "tool_used": "None"} classification = self._classify_query(query) if classification == "calculation": return self._execute_agent_query(query, selected_sheet) else: return self._execute_rag_query(query, selected_sheet) def _execute_rag_query(self, query: str, sheet_name: str) -> Dict[str, Any]: try: query_engine = self.vector_stores[sheet_name].as_query_engine() response = query_engine.query(query) return {"answer": str(response), "tool_used": "Lookup (RAG Search)"} except Exception as e: return {"answer": f"Error during lookup: {e}", "tool_used": "Lookup (RAG Search)"} def _execute_agent_query(self, query: str, sheet_name: str) -> Dict[str, Any]: try: df = self.dataframes[sheet_name] agent = create_pandas_dataframe_agent(self.agent_llm, df, agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, allow_dangerous_code=True, max_iterations=15, handle_parsing_errors=True) response = agent.invoke(query) return {"answer": response['output'], "tool_used": "Calculation (Pandas Agent)"} except Exception as e: return {"answer": f"Error during calculation: {e}", "tool_used": "Calculation (Pandas Agent)"} # --- Gradio UI --- def process_excel(api_key: str, file_obj: gr.File): if not api_key: raise gr.Error("Please provide your OpenAI API key.") if not file_obj: raise gr.Error("Please upload an Excel file.") system = HybridExcelQuerySystem(openai_api_key=api_key) logs = system.load_excel_file(file_obj.name) sheet_names = system.sheet_names return (logs, system, gr.update(choices=sheet_names, value=sheet_names[0] if sheet_names else None, visible=True), gr.update(visible=True)) def user_interaction(question: str, history: List, system_state: HybridExcelQuerySystem, selected_sheet: str): if not system_state: raise gr.Error("Please upload and process a file first.") result = system_state.query(question, selected_sheet) answer = result.get("answer", "No response.") tool_used = result.get("tool_used", "Unknown") full_response = f"{answer}\n\n*Tool Used: {tool_used}*" return full_response with gr.Blocks(theme=gr.themes.Soft(), title="Hybrid Excel Analyzer") as demo: system_state = gr.State() gr.Markdown("# 🤖 Hybrid Excel Analyzer") gr.Markdown("This app automatically chooses the best AI tool to answer your questions about an Excel file.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 1. Setup") openai_api_key = gr.Textbox(label="OpenAI API Key", type="password", value=os.getenv("OPENAI_API_KEY", "")) excel_upload = gr.File(label="Upload Excel File", file_types=[".xlsx", ".xls"]) process_button = gr.Button("Process File", variant="primary") status_text = gr.Textbox(label="Processing Status", interactive=False, lines=8) with gr.Column(scale=2): gr.Markdown("### 2. Ask a Question") with gr.Group(visible=False) as query_ui: sheet_selector = gr.Dropdown(label="Select a Sheet") chat_interface = gr.ChatInterface(fn=user_interaction, additional_inputs=[system_state, sheet_selector], title="Chat with your Excel Data") process_button.click(fn=process_excel, inputs=[openai_api_key, excel_upload], outputs=[status_text, system_state, sheet_selector, query_ui]) if __name__ == "__main__": demo.launch()