import os import pandas as pd import gradio as gr import glob import threading import time import queue import numpy as np from rank_bm25 import BM25Okapi from dotenv import load_dotenv from smolagents import CodeAgent, LiteLLMModel from agent import create_web_agent, generate_prompt from followup_agent import run_followup_analysis from unidecode import unidecode load_dotenv() # Global variables for progress tracking progress_queue = queue.Queue() current_status = "" # Initialize LLM translator and BM25 llm_translator = None bm25_model = None precomputed_titles = None def initialize_models(): """Initialize the LLM translator and BM25 model""" global llm_translator, bm25_model, precomputed_titles if llm_translator is None: # Initialize LLM for translation try: model = LiteLLMModel( model_id="gemini/gemini-2.5-flash-preview-05-20", api_key=os.getenv("GEMINI_API_KEY") ) llm_translator = CodeAgent(tools=[], model=model, max_steps=1) print("✅ LLM translator initialized") except Exception as e: print(f"⚠️ Error initializing LLM translator: {e}") # Load pre-computed BM25 model if available if bm25_model is None: try: import pickle with open('bm25_data.pkl', 'rb') as f: bm25_data = pickle.load(f) bm25_model = bm25_data['bm25_model'] precomputed_titles = bm25_data['titles'] print(f"✅ Loaded pre-computed BM25 model for {len(precomputed_titles)} datasets") except FileNotFoundError: print("⚠️ Pre-computed BM25 model not found. Will compute at runtime.") except Exception as e: print(f"⚠️ Error loading pre-computed BM25 model: {e}") print("Will compute BM25 at runtime.") def translate_query_llm(query, target_lang='fr'): """Translate query using LLM""" global llm_translator if llm_translator is None: initialize_models() if llm_translator is None: print("⚠️ LLM translator not available, returning original query") return query, 'unknown' try: # Create translation prompt if target_lang == 'fr': target_language = "French" elif target_lang == 'en': target_language = "English" else: target_language = target_lang translation_prompt = f""" Translate the following text to {target_language}. If the text is already in {target_language}, return it as is. Only return the translated text, nothing else. Text to translate: "{query}" """ # Get translation from LLM response = llm_translator.run(translation_prompt) translated_text = str(response).strip().strip('"').strip("'") # Simple language detection if query.lower() == translated_text.lower(): source_lang = target_lang else: source_lang = 'en' if target_lang == 'fr' else 'fr' return translated_text, source_lang except Exception as e: print(f"LLM translation error: {e}") return query, 'unknown' def simple_keyword_preprocessing(text): """Simple preprocessing for keyword matching - handles case, accents and basic plurals""" # Convert to lowercase and remove accents text = unidecode(str(text).lower()) # Basic plural handling - just remove trailing 's' and 'x' words = text.split() processed_words = [] for word in words: # Remove common plural endings if word.endswith('s') and len(word) > 3 and not word.endswith('ss'): word = word[:-1] elif word.endswith('x') and len(word) > 3: word = word[:-1] processed_words.append(word) return processed_words def find_similar_dataset_bm25(query, df): """Find the most similar dataset using BM25 keyword matching""" global bm25_model, precomputed_titles # Translate query to French for better matching with French datasets translated_query, original_lang = translate_query_llm(query, target_lang='fr') # Combine original and translated queries for search search_queries = [query, translated_query] if query != translated_query else [query] # Get dataset titles dataset_titles = df['title'].fillna('').tolist() # Use pre-computed BM25 model if available and matches current dataset if (bm25_model is not None and precomputed_titles is not None and len(dataset_titles) == len(precomputed_titles) and dataset_titles == precomputed_titles): print("🚀 Using pre-computed BM25 model for fast matching") bm25 = bm25_model else: # Build BM25 model at runtime print("⚠️ Computing BM25 model at runtime...") # Preprocess all dataset titles into tokenized form processed_titles = [simple_keyword_preprocessing(title) for title in dataset_titles] bm25 = BM25Okapi(processed_titles) best_score = -1 best_idx = 0 for search_query in search_queries: try: # Preprocess the search query processed_query = simple_keyword_preprocessing(search_query) # Get BM25 scores for all documents scores = bm25.get_scores(processed_query) max_score = scores.max() max_idx = scores.argmax() if max_score > best_score: best_score = max_score best_idx = max_idx except Exception as e: print(f"Error processing query '{search_query}': {e}") continue # Show top 5 matches for comparison if len(search_queries) > 0: processed_query = simple_keyword_preprocessing(search_queries[0]) scores = bm25.get_scores(processed_query) return best_idx, best_score, translated_query, original_lang def create_progress_callback(): """Create a callback function for tracking agent progress""" def progress_callback(memory_step, agent=None): """Callback function called at each agent step""" step_number = memory_step.step_number # Extract information about the current step if hasattr(memory_step, 'action_input') and memory_step.action_input: action_content = memory_step.action_input elif hasattr(memory_step, 'action_output') and memory_step.action_output: action_content = str(memory_step.action_output) else: action_content = "" # Define progress based on step content and number progress_val = min(0.1 + (step_number * 0.03), 0.95) # Progressive increase # Analyze the step content to provide meaningful status action_lower = action_content.lower() if action_content else "" if "visit_webpage" in action_lower or "examining" in action_lower: description = f"🔍 Step {step_number}: Examining webpage..." elif "get_all_links" in action_lower or "links" in action_lower: description = f"🔗 Step {step_number}: Extracting data links..." elif "read_file_from_url" in action_lower or "reading" in action_lower: description = f"📊 Step {step_number}: Loading dataset..." elif "get_dataset_description" in action_lower or "description" in action_lower: description = f"📋 Step {step_number}: Analyzing dataset structure..." elif "department" in action_lower or "region" in action_lower: description = f"🗺️ Step {step_number}: Processing geographic data..." elif "plot" in action_lower or "map" in action_lower or "france" in action_lower: description = f"🗺️ Step {step_number}: Creating France map..." elif "visualization" in action_lower or "chart" in action_lower: description = f"📈 Step {step_number}: Generating visualizations..." elif "save" in action_lower or "png" in action_lower: description = f"💾 Step {step_number}: Saving visualizations..." elif "docx" in action_lower or "report" in action_lower: description = f"📄 Step {step_number}: Creating DOCX report..." elif hasattr(memory_step, 'error') and memory_step.error: description = f"⚠️ Step {step_number}: Handling error..." else: description = f"🤖 Step {step_number}: Processing..." # Check if this is the final step if hasattr(memory_step, 'action_output') and memory_step.action_output and "final" in action_lower: progress_val = 1.0 description = "✅ Analysis complete!" # Put the progress update in the queue try: progress_queue.put((progress_val, description)) except: pass return progress_callback def run_agent_analysis_with_progress(query, progress_callback, df=None, page_url_callback=None, data_gouv_page=None, most_similar_idx=None): """ Run the agent analysis with progress tracking using smolagents callbacks. """ try: # Clean up previous results if os.path.exists('generated_data'): for file in glob.glob('generated_data/*'): try: os.remove(file) except: pass else: os.makedirs('generated_data', exist_ok=True) # If dataset info not provided, find it (fallback) if data_gouv_page is None or most_similar_idx is None: progress_callback(0.02, "🤖 Initializing LLM translator and BM25...") initialize_models() progress_callback(0.05, "🔍 Searching for relevant datasets (using BM25 keyword matching)...") # Read the filtered dataset if not provided if df is None: df = pd.read_csv('filtered_dataset.csv') # Find the most similar dataset using BM25 keyword matching most_similar_idx, similarity_score, translated_query, original_lang = find_similar_dataset_bm25(query, df) data_gouv_page = df.iloc[most_similar_idx]['url'] # Immediately show the page URL via callback if page_url_callback: page_url_callback(data_gouv_page) progress_callback(0.08, "🤖 Initializing agent...") else: # Dataset already found, continue from where we left off progress_callback(0.09, "🤖 Initializing agent...") step_callback = create_progress_callback() progress_callback(0.1, "🤖 Starting agent analysis...") # Create the agent with progress callback web_agent = create_web_agent(step_callback) prompt = generate_prompt(data_gouv_page) # Run the agent - the step_callbacks will automatically update progress answer = web_agent.run(prompt) # Check if the agent found no processable data answer_lower = str(answer).lower() if answer else "" if ("no processable data" in answer_lower or "no csv nor json" in answer_lower or "cannot find csv" in answer_lower or "cannot find json" in answer_lower or "no data to process" in answer_lower): progress_callback(1.0, "❌ No CSV/JSON files found in the dataset") return "❌ No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], data_gouv_page # Check if files were generated generated_files = glob.glob('generated_data/*') if generated_files: progress_callback(1.0, "✅ Analysis completed successfully!") return "Analysis completed successfully!", generated_files, data_gouv_page else: progress_callback(1.0, "⚠️ Analysis completed but no files were generated.") return "Analysis completed but no files were generated.", [], data_gouv_page except Exception as e: progress_callback(1.0, f"❌ Error: {str(e)}") return f"Error during analysis: {str(e)}", [], None def search_and_analyze(query, progress=gr.Progress()): """ Unified function that does initial search then lets agent analyze with full autonomy. Uses Gradio's progress bar for visual feedback. """ # Clear the progress queue while not progress_queue.empty(): try: progress_queue.get_nowait() except queue.Empty: break # Initialize outputs docx_file = None images_output = [gr.Image(visible=False)] * 4 status = "🚀 Starting agent-driven analysis..." # Initial progress progress(0.05, desc="🚀 Initializing agent...") def progress_callback(progress_val, description): """Callback function to update progress - puts updates in queue""" try: progress_queue.put((progress_val, description)) except: pass # Run analysis in a separate thread result_queue = queue.Queue() def run_analysis(): try: # Clean up previous results if os.path.exists('generated_data'): for file in glob.glob('generated_data/*'): try: os.remove(file) except: pass else: os.makedirs('generated_data', exist_ok=True) # Do initial search if query provided initial_search_results = None if query.strip(): progress_callback(0.06, f"🔍 Initial search for: {query[:50]}...") try: # Import search function from tools from tools.retrieval_tools import search_datasets initial_search_results = search_datasets(query, top_k=5) progress_callback(0.08, "🤖 Starting agent with search results...") except Exception as e: print(f"Initial search failed: {e}") progress_callback(0.08, "🤖 Starting agent without initial results...") else: progress_callback(0.08, "🤖 Starting agent for random selection...") step_callback = create_progress_callback() # Create the agent with progress callback web_agent = create_web_agent(step_callback) # Generate unified prompt with initial search results prompt = generate_prompt(user_query=query, initial_search_results=initial_search_results) progress_callback(0.1, "🤖 Agent analyzing datasets...") # Run the agent - the step_callbacks will automatically update progress answer = web_agent.run(prompt) # Check if the agent found no processable data answer_lower = str(answer).lower() if answer else "" if ("no processable data" in answer_lower or "no csv nor json" in answer_lower or "cannot find csv" in answer_lower or "cannot find json" in answer_lower or "no data to process" in answer_lower): progress_callback(1.0, "❌ No CSV/JSON files found in the dataset") result_queue.put(("❌ No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], None)) return # Check if files were generated generated_files = glob.glob('generated_data/*') if generated_files: progress_callback(1.0, "✅ Analysis completed successfully!") result_queue.put(("Analysis completed successfully!", generated_files, "Agent-selected dataset")) else: progress_callback(1.0, "⚠️ Analysis completed but no files were generated.") result_queue.put(("Analysis completed but no files were generated.", [], None)) except Exception as e: progress_callback(1.0, f"❌ Error: {str(e)}") result_queue.put((f"Error during analysis: {str(e)}", [], None)) analysis_thread = threading.Thread(target=run_analysis) analysis_thread.start() # Show initial status current_status = "🤖 Agent is finding relevant datasets..." progress(0.08, desc=current_status) # Monitor progress while analysis runs last_progress = 0.08 while analysis_thread.is_alive() or not result_queue.empty(): try: # Check for progress updates from queue try: progress_val, description = progress_queue.get(timeout=0.1) if progress_val > last_progress: last_progress = progress_val current_status = description progress(progress_val, desc=description) except queue.Empty: pass # Check if analysis is complete try: final_status, files, page_url = result_queue.get(timeout=0.1) # Check if this is a "no data" case if "❌ No CSV/JSON files found" in final_status: progress(1.0, desc="❌ No processable data found") return (gr.Textbox(value="Agent-selected dataset", visible=True), final_status, gr.File(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Markdown(visible=False), # keep follow-up hidden gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) # Final progress update progress(1.0, desc="✅ Processing results...") # Process results docx_file = None png_files = [] for file in files: if file.endswith('.docx'): docx_file = file elif file.endswith('.png'): png_files.append(file) # Prepare final outputs download_button = gr.File(value=docx_file, visible=True) if docx_file else None # Prepare images for display (up to 4 images) images = [] for i in range(4): if i < len(png_files): images.append(gr.Image(value=png_files[i], visible=True)) else: images.append(gr.Image(visible=False)) # final progress completion progress(1.0, desc="🎉 Complete!") # Show follow-up section after successful completion return (gr.Textbox(value=page_url if page_url else "Agent-selected dataset", visible=True), final_status, download_button, *images, gr.Markdown(visible=True), # followup_section_divider gr.HTML(visible=True), # followup_section_header gr.Row(visible=True), # followup_input_row gr.Row(visible=True), # followup_result_row gr.Row(visible=True), # followup_image_row gr.Row(visible=True), # followup_examples_header_row gr.Row(visible=True)) # followup_examples_row except queue.Empty: pass time.sleep(0.5) # Small delay to prevent excessive updates except Exception as e: progress(1.0, desc=f"❌ Error: {str(e)}") return (gr.Textbox(value="Error", visible=True), f"❌ Error: {str(e)}", None, *images_output, gr.Markdown(visible=False), # keep follow-up hidden on error gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) # Ensure thread completes analysis_thread.join(timeout=1) # Fallback return progress(1.0, desc="🏁 Finished") return (gr.Textbox(value="Completed", visible=True), current_status, docx_file, *images_output, gr.Markdown(visible=False), # keep follow-up hidden gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) def run_followup_question(question, progress=gr.Progress()): """ Run a follow-up analysis based on user's question about the previous report. """ if not question.strip(): return "Please enter a follow-up question.", gr.Image(visible=False) progress(0.1, desc="🤖 Starting follow-up analysis...") try: # Check if there are previous results if not os.path.exists('generated_data') or not os.listdir('generated_data'): return "No previous analysis found. Please run an analysis first.", gr.Image(visible=False) progress(0.3, desc="🔍 Analyzing previous report and dataset...") # Run the follow-up analysis result = run_followup_analysis(question) progress(0.9, desc="📊 Processing results...") # Look for new visualizations created by the follow-up analysis import glob # Get all images that were created after the analysis started all_images = glob.glob('generated_data/*.png') # Get recent images (created in the last few seconds) import time current_time = time.time() recent_images = [] for img_path in all_images: img_time = os.path.getctime(img_path) if current_time - img_time < 120: # Images created in last 2 minutes recent_images.append(img_path) # Get the most recent image if any latest_image = None if recent_images: latest_image = max(recent_images, key=os.path.getctime) progress(1.0, desc="✅ Follow-up analysis complete!") # Enhanced result formatting final_result = result if latest_image: final_result += f"\n\n📊 **Visualization Created:** {os.path.basename(latest_image)}" if len(recent_images) > 1: final_result += f"\n📈 **Total new visualizations:** {len(recent_images)}" return final_result, gr.Image(value=latest_image, visible=True) else: return final_result, gr.Image(visible=False) except Exception as e: progress(1.0, desc="❌ Error in follow-up analysis") return f"Error: {str(e)}", gr.Image(visible=False) # Create the Gradio interface with gr.Blocks(title="🤖 French Public Data Analysis Agent", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1200px !important; margin: auto; width: 100% !important; } .main-header { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: 0 8px 32px rgba(0,0,0,0.1); } .accordion-content { overflow: hidden !important; width: 100% !important; } .gr-accordion { width: 100% !important; max-width: 100% !important; } .gr-accordion .gr-row { width: 100% !important; max-width: 100% !important; margin: 0 !important; } .gr-accordion .gr-column { min-width: 0 !important; flex: 1 !important; max-width: 50% !important; padding-right: 1rem !important; } .gr-accordion .gr-column:last-child { padding-right: 0 !important; padding-left: 1rem !important; } """) as demo: # Main header with better styling gr.HTML("""

🤖 French Public Data Analysis Agent

Intelligent analysis of French public datasets with AI-powered insights

""") # What this agent does gr.HTML("""

🌐 Search in French or English • 🤖 AI Agent finds & analyzes datasets • 🗺️ Generate Reports with visualizations

Initial search results guide the agent, but it can search for different datasets if needed

""") # Tips & Information accordion - moved to the top with gr.Accordion("💡 Tips & Information", open=False): with gr.Row(): with gr.Column(): gr.Markdown(""" 🎯 **How to Use:** - Enter search terms related to French public data - Leave empty for random high-quality dataset selection - System provides initial search results to guide the agent - Agent can use provided results or search for different datasets - Results include visualizations and downloadable reports ⏱️ **Processing Time:** - Analysis takes 7-15 minutes depending on dataset complexity - Agent has full autonomy to find the best datasets """) with gr.Column(): gr.Markdown(""" ⚠️ **Important Notes:** - Agent gets initial search results but has full autonomy to make decisions - Agent can choose from initial results or search for different datasets - Some datasets may not contain processable CSV/JSON files - All visualizations are automatically generated - Maps focus on France when geographic data is available 🌐 **Language Support:** - Search in French or English - queries are automatically translated """) with gr.Row(): query_input = gr.Textbox( label="Search Query", placeholder="e.g., road traffic accidents, education, housing (or leave empty for random selection)", scale=4 ) search_button = gr.Button( "🚀 Analyze Dataset", variant="primary", scale=1, size="lg" ) # Quick Start Examples row with gr.Row(): gr.HTML("""

🚀 Quick Start Examples

Click any example below to get started

""") with gr.Row(): examples = [ ("🚗 Road Traffic Accidents 2023", "road traffic accidents 2023"), ("🎓 Education Directory", "education directory"), ("🏠 French Vacant Housing Private Park", "French vacant housing private park"), ] for emoji_text, query_text in examples: gr.Button( emoji_text, variant="secondary", size="sm" ).click( lambda x=query_text: x, outputs=query_input ) # Page info and analysis status with progress bar with gr.Group(): page_url_display = gr.Textbox(label="🔗 Page Started On", interactive=False, visible=False) with gr.Row(): status_output = gr.Textbox(label="📊 Analysis Status", interactive=False, scale=1) # Download section with gr.Row(): download_button = gr.File( label="📄 Download DOCX Report", visible=False ) gr.Markdown("---") gr.HTML("""

📊 Generated Visualizations

Automatically generated charts and maps will appear below

""") with gr.Row(): with gr.Column(): image1 = gr.Image(label="📈 Chart 1", visible=False, height=400) image2 = gr.Image(label="📊 Chart 2", visible=False, height=400) with gr.Column(): image3 = gr.Image(label="🗺️ Map/Chart 3", visible=False, height=400) image4 = gr.Image(label="📉 Chart 4", visible=False, height=400) # Follow-up Analysis Section (initially hidden) followup_section_divider = gr.Markdown("---", visible=False) followup_section_header = gr.HTML("""

🤖 Follow-up Analysis

Ask about report findings, request data analysis, or get contextual information

""", visible=False) with gr.Row(visible=False) as followup_input_row: followup_input = gr.Textbox( label="Follow-up Question", placeholder="e.g., What are the main findings?, Show me correlation between columns, What is road safety policy in France?", scale=4 ) followup_button = gr.Button( "🔍 Analyze", variant="secondary", scale=1, size="lg" ) with gr.Row(visible=False) as followup_result_row: followup_result = gr.Textbox( label="📊 Follow-up Analysis Results", interactive=False, lines=10, visible=True ) with gr.Row(visible=False) as followup_image_row: followup_image = gr.Image( label="📈 Follow-up Visualization", visible=False, height=500 ) # Follow-up Examples (initially hidden) with gr.Row(visible=False) as followup_examples_header_row: gr.HTML("""

💡 Example Follow-up Questions

Click any example below to try it out

""") with gr.Row(visible=False) as followup_examples_row: followup_examples = [ ("📋 Report Summary", "What were the main findings from the analysis?"), ("🌐 Context Info", "What is the policy context for this data in France?"), ("📊 Create Chart", "Show me the correlation between two numerical columns with a scatter plot"), ("📈 Data Statistics", "Give me statistical summary for a specific column"), ("🎯 Filter Data", "Filter the data by specific criteria and show results"), ("🔍 General Question", "Tell me more about this topic and its importance"), ] for emoji_text, query_text in followup_examples: gr.Button( emoji_text, variant="secondary", size="sm" ).click( lambda x=query_text: x, outputs=followup_input ) # Set up the search button click event with progress bar search_button.click( fn=search_and_analyze, inputs=[query_input], outputs=[page_url_display, status_output, download_button, image1, image2, image3, image4, followup_section_divider, followup_section_header, followup_input_row, followup_result_row, followup_image_row, followup_examples_header_row, followup_examples_row], show_progress="full" # Show the built-in progress bar ) # Set up the follow-up button click event followup_button.click( fn=run_followup_question, inputs=[followup_input], outputs=[followup_result, followup_image], show_progress="full" ) if __name__ == "__main__": demo.queue() # Enable queuing for real-time updates demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )