🤖 French Public Data Analysis Agent
Intelligent analysis of French public datasets with AI-powered insights
import os import pandas as pd import gradio as gr import glob import threading import time import queue import numpy as np from rank_bm25 import BM25Okapi from dotenv import load_dotenv from smolagents import CodeAgent, LiteLLMModel from agent import create_web_agent, generate_prompt from followup_agent import run_followup_analysis from unidecode import unidecode load_dotenv() # Global variables for progress tracking progress_queue = queue.Queue() current_status = "" # Initialize LLM translator and BM25 llm_translator = None bm25_model = None precomputed_titles = None def initialize_models(): """Initialize the LLM translator and BM25 model""" global llm_translator, bm25_model, precomputed_titles if llm_translator is None: # Initialize LLM for translation try: model = LiteLLMModel( model_id="gemini/gemini-2.5-flash-preview-05-20", api_key=os.getenv("GEMINI_API_KEY") ) llm_translator = CodeAgent(tools=[], model=model, max_steps=1) print("✅ LLM translator initialized") except Exception as e: print(f"⚠️ Error initializing LLM translator: {e}") # Load pre-computed BM25 model if available if bm25_model is None: try: import pickle with open('bm25_data.pkl', 'rb') as f: bm25_data = pickle.load(f) bm25_model = bm25_data['bm25_model'] precomputed_titles = bm25_data['titles'] print(f"✅ Loaded pre-computed BM25 model for {len(precomputed_titles)} datasets") except FileNotFoundError: print("⚠️ Pre-computed BM25 model not found. Will compute at runtime.") except Exception as e: print(f"⚠️ Error loading pre-computed BM25 model: {e}") print("Will compute BM25 at runtime.") def translate_query_llm(query, target_lang='fr'): """Translate query using LLM""" global llm_translator if llm_translator is None: initialize_models() if llm_translator is None: print("⚠️ LLM translator not available, returning original query") return query, 'unknown' try: # Create translation prompt if target_lang == 'fr': target_language = "French" elif target_lang == 'en': target_language = "English" else: target_language = target_lang translation_prompt = f""" Translate the following text to {target_language}. If the text is already in {target_language}, return it as is. Only return the translated text, nothing else. Text to translate: "{query}" """ # Get translation from LLM response = llm_translator.run(translation_prompt) translated_text = str(response).strip().strip('"').strip("'") # Simple language detection if query.lower() == translated_text.lower(): source_lang = target_lang else: source_lang = 'en' if target_lang == 'fr' else 'fr' return translated_text, source_lang except Exception as e: print(f"LLM translation error: {e}") return query, 'unknown' def simple_keyword_preprocessing(text): """Simple preprocessing for keyword matching - handles case, accents and basic plurals""" # Convert to lowercase and remove accents text = unidecode(str(text).lower()) # Basic plural handling - just remove trailing 's' and 'x' words = text.split() processed_words = [] for word in words: # Remove common plural endings if word.endswith('s') and len(word) > 3 and not word.endswith('ss'): word = word[:-1] elif word.endswith('x') and len(word) > 3: word = word[:-1] processed_words.append(word) return processed_words def find_similar_dataset_bm25(query, df): """Find the most similar dataset using BM25 keyword matching""" global bm25_model, precomputed_titles # Translate query to French for better matching with French datasets translated_query, original_lang = translate_query_llm(query, target_lang='fr') # Combine original and translated queries for search search_queries = [query, translated_query] if query != translated_query else [query] # Get dataset titles dataset_titles = df['title'].fillna('').tolist() # Use pre-computed BM25 model if available and matches current dataset if (bm25_model is not None and precomputed_titles is not None and len(dataset_titles) == len(precomputed_titles) and dataset_titles == precomputed_titles): print("🚀 Using pre-computed BM25 model for fast matching") bm25 = bm25_model else: # Build BM25 model at runtime print("⚠️ Computing BM25 model at runtime...") # Preprocess all dataset titles into tokenized form processed_titles = [simple_keyword_preprocessing(title) for title in dataset_titles] bm25 = BM25Okapi(processed_titles) best_score = -1 best_idx = 0 for search_query in search_queries: try: # Preprocess the search query processed_query = simple_keyword_preprocessing(search_query) # Get BM25 scores for all documents scores = bm25.get_scores(processed_query) max_score = scores.max() max_idx = scores.argmax() if max_score > best_score: best_score = max_score best_idx = max_idx except Exception as e: print(f"Error processing query '{search_query}': {e}") continue # Show top 5 matches for comparison if len(search_queries) > 0: processed_query = simple_keyword_preprocessing(search_queries[0]) scores = bm25.get_scores(processed_query) return best_idx, best_score, translated_query, original_lang def create_progress_callback(): """Create a callback function for tracking agent progress""" def progress_callback(memory_step, agent=None): """Callback function called at each agent step""" step_number = memory_step.step_number # Extract information about the current step if hasattr(memory_step, 'action_input') and memory_step.action_input: action_content = memory_step.action_input elif hasattr(memory_step, 'action_output') and memory_step.action_output: action_content = str(memory_step.action_output) else: action_content = "" # Define progress based on step content and number progress_val = min(0.1 + (step_number * 0.03), 0.95) # Progressive increase # Analyze the step content to provide meaningful status action_lower = action_content.lower() if action_content else "" if "visit_webpage" in action_lower or "examining" in action_lower: description = f"🔍 Step {step_number}: Examining webpage..." elif "get_all_links" in action_lower or "links" in action_lower: description = f"🔗 Step {step_number}: Extracting data links..." elif "read_file_from_url" in action_lower or "reading" in action_lower: description = f"📊 Step {step_number}: Loading dataset..." elif "get_dataset_description" in action_lower or "description" in action_lower: description = f"📋 Step {step_number}: Analyzing dataset structure..." elif "department" in action_lower or "region" in action_lower: description = f"🗺️ Step {step_number}: Processing geographic data..." elif "plot" in action_lower or "map" in action_lower or "france" in action_lower: description = f"🗺️ Step {step_number}: Creating France map..." elif "visualization" in action_lower or "chart" in action_lower: description = f"📈 Step {step_number}: Generating visualizations..." elif "save" in action_lower or "png" in action_lower: description = f"💾 Step {step_number}: Saving visualizations..." elif "docx" in action_lower or "report" in action_lower: description = f"📄 Step {step_number}: Creating DOCX report..." elif hasattr(memory_step, 'error') and memory_step.error: description = f"⚠️ Step {step_number}: Handling error..." else: description = f"🤖 Step {step_number}: Processing..." # Check if this is the final step if hasattr(memory_step, 'action_output') and memory_step.action_output and "final" in action_lower: progress_val = 1.0 description = "✅ Analysis complete!" # Put the progress update in the queue try: progress_queue.put((progress_val, description)) except: pass return progress_callback def run_agent_analysis_with_progress(query, progress_callback, df=None, page_url_callback=None, data_gouv_page=None, most_similar_idx=None): """ Run the agent analysis with progress tracking using smolagents callbacks. """ try: # Clean up previous results if os.path.exists('generated_data'): for file in glob.glob('generated_data/*'): try: os.remove(file) except: pass else: os.makedirs('generated_data', exist_ok=True) # If dataset info not provided, find it (fallback) if data_gouv_page is None or most_similar_idx is None: progress_callback(0.02, "🤖 Initializing LLM translator and BM25...") initialize_models() progress_callback(0.05, "🔍 Searching for relevant datasets (using BM25 keyword matching)...") # Read the filtered dataset if not provided if df is None: df = pd.read_csv('filtered_dataset.csv') # Find the most similar dataset using BM25 keyword matching most_similar_idx, similarity_score, translated_query, original_lang = find_similar_dataset_bm25(query, df) data_gouv_page = df.iloc[most_similar_idx]['url'] # Immediately show the page URL via callback if page_url_callback: page_url_callback(data_gouv_page) progress_callback(0.08, "🤖 Initializing agent...") else: # Dataset already found, continue from where we left off progress_callback(0.09, "🤖 Initializing agent...") step_callback = create_progress_callback() progress_callback(0.1, "🤖 Starting agent analysis...") # Create the agent with progress callback web_agent = create_web_agent(step_callback) prompt = generate_prompt(data_gouv_page) # Run the agent - the step_callbacks will automatically update progress answer = web_agent.run(prompt) # Check if the agent found no processable data answer_lower = str(answer).lower() if answer else "" if ("no processable data" in answer_lower or "no csv nor json" in answer_lower or "cannot find csv" in answer_lower or "cannot find json" in answer_lower or "no data to process" in answer_lower): progress_callback(1.0, "❌ No CSV/JSON files found in the dataset") return "❌ No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], data_gouv_page # Check if files were generated generated_files = glob.glob('generated_data/*') if generated_files: progress_callback(1.0, "✅ Analysis completed successfully!") return "Analysis completed successfully!", generated_files, data_gouv_page else: progress_callback(1.0, "⚠️ Analysis completed but no files were generated.") return "Analysis completed but no files were generated.", [], data_gouv_page except Exception as e: progress_callback(1.0, f"❌ Error: {str(e)}") return f"Error during analysis: {str(e)}", [], None def search_and_analyze(query, progress=gr.Progress()): """ Unified function that does initial search then lets agent analyze with full autonomy. Uses Gradio's progress bar for visual feedback. """ # Clear the progress queue while not progress_queue.empty(): try: progress_queue.get_nowait() except queue.Empty: break # Initialize outputs docx_file = None images_output = [gr.Image(visible=False)] * 4 status = "🚀 Starting agent-driven analysis..." # Initial progress progress(0.05, desc="🚀 Initializing agent...") def progress_callback(progress_val, description): """Callback function to update progress - puts updates in queue""" try: progress_queue.put((progress_val, description)) except: pass # Run analysis in a separate thread result_queue = queue.Queue() def run_analysis(): try: # Clean up previous results if os.path.exists('generated_data'): for file in glob.glob('generated_data/*'): try: os.remove(file) except: pass else: os.makedirs('generated_data', exist_ok=True) # Do initial search if query provided initial_search_results = None if query.strip(): progress_callback(0.06, f"🔍 Initial search for: {query[:50]}...") try: # Import search function from tools from tools.retrieval_tools import search_datasets initial_search_results = search_datasets(query, top_k=5) progress_callback(0.08, "🤖 Starting agent with search results...") except Exception as e: print(f"Initial search failed: {e}") progress_callback(0.08, "🤖 Starting agent without initial results...") else: progress_callback(0.08, "🤖 Starting agent for random selection...") step_callback = create_progress_callback() # Create the agent with progress callback web_agent = create_web_agent(step_callback) # Generate unified prompt with initial search results prompt = generate_prompt(user_query=query, initial_search_results=initial_search_results) progress_callback(0.1, "🤖 Agent analyzing datasets...") # Run the agent - the step_callbacks will automatically update progress answer = web_agent.run(prompt) # Check if the agent found no processable data answer_lower = str(answer).lower() if answer else "" if ("no processable data" in answer_lower or "no csv nor json" in answer_lower or "cannot find csv" in answer_lower or "cannot find json" in answer_lower or "no data to process" in answer_lower): progress_callback(1.0, "❌ No CSV/JSON files found in the dataset") result_queue.put(("❌ No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], None)) return # Check if files were generated generated_files = glob.glob('generated_data/*') if generated_files: progress_callback(1.0, "✅ Analysis completed successfully!") result_queue.put(("Analysis completed successfully!", generated_files, "Agent-selected dataset")) else: progress_callback(1.0, "⚠️ Analysis completed but no files were generated.") result_queue.put(("Analysis completed but no files were generated.", [], None)) except Exception as e: progress_callback(1.0, f"❌ Error: {str(e)}") result_queue.put((f"Error during analysis: {str(e)}", [], None)) analysis_thread = threading.Thread(target=run_analysis) analysis_thread.start() # Show initial status current_status = "🤖 Agent is finding relevant datasets..." progress(0.08, desc=current_status) # Monitor progress while analysis runs last_progress = 0.08 while analysis_thread.is_alive() or not result_queue.empty(): try: # Check for progress updates from queue try: progress_val, description = progress_queue.get(timeout=0.1) if progress_val > last_progress: last_progress = progress_val current_status = description progress(progress_val, desc=description) except queue.Empty: pass # Check if analysis is complete try: final_status, files, page_url = result_queue.get(timeout=0.1) # Check if this is a "no data" case if "❌ No CSV/JSON files found" in final_status: progress(1.0, desc="❌ No processable data found") return (gr.Textbox(value="Agent-selected dataset", visible=True), final_status, gr.File(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Image(visible=False), gr.Markdown(visible=False), # keep follow-up hidden gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) # Final progress update progress(1.0, desc="✅ Processing results...") # Process results docx_file = None png_files = [] for file in files: if file.endswith('.docx'): docx_file = file elif file.endswith('.png'): png_files.append(file) # Prepare final outputs download_button = gr.File(value=docx_file, visible=True) if docx_file else None # Prepare images for display (up to 4 images) images = [] for i in range(4): if i < len(png_files): images.append(gr.Image(value=png_files[i], visible=True)) else: images.append(gr.Image(visible=False)) # final progress completion progress(1.0, desc="🎉 Complete!") # Show follow-up section after successful completion return (gr.Textbox(value=page_url if page_url else "Agent-selected dataset", visible=True), final_status, download_button, *images, gr.Markdown(visible=True), # followup_section_divider gr.HTML(visible=True), # followup_section_header gr.Row(visible=True), # followup_input_row gr.Row(visible=True), # followup_result_row gr.Row(visible=True), # followup_image_row gr.Row(visible=True), # followup_examples_header_row gr.Row(visible=True)) # followup_examples_row except queue.Empty: pass time.sleep(0.5) # Small delay to prevent excessive updates except Exception as e: progress(1.0, desc=f"❌ Error: {str(e)}") return (gr.Textbox(value="Error", visible=True), f"❌ Error: {str(e)}", None, *images_output, gr.Markdown(visible=False), # keep follow-up hidden on error gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) # Ensure thread completes analysis_thread.join(timeout=1) # Fallback return progress(1.0, desc="🏁 Finished") return (gr.Textbox(value="Completed", visible=True), current_status, docx_file, *images_output, gr.Markdown(visible=False), # keep follow-up hidden gr.HTML(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False), gr.Row(visible=False)) def run_followup_question(question, progress=gr.Progress()): """ Run a follow-up analysis based on user's question about the previous report. """ if not question.strip(): return "Please enter a follow-up question.", gr.Image(visible=False) progress(0.1, desc="🤖 Starting follow-up analysis...") try: # Check if there are previous results if not os.path.exists('generated_data') or not os.listdir('generated_data'): return "No previous analysis found. Please run an analysis first.", gr.Image(visible=False) progress(0.3, desc="🔍 Analyzing previous report and dataset...") # Run the follow-up analysis result = run_followup_analysis(question) progress(0.9, desc="📊 Processing results...") # Look for new visualizations created by the follow-up analysis import glob # Get all images that were created after the analysis started all_images = glob.glob('generated_data/*.png') # Get recent images (created in the last few seconds) import time current_time = time.time() recent_images = [] for img_path in all_images: img_time = os.path.getctime(img_path) if current_time - img_time < 120: # Images created in last 2 minutes recent_images.append(img_path) # Get the most recent image if any latest_image = None if recent_images: latest_image = max(recent_images, key=os.path.getctime) progress(1.0, desc="✅ Follow-up analysis complete!") # Enhanced result formatting final_result = result if latest_image: final_result += f"\n\n📊 **Visualization Created:** {os.path.basename(latest_image)}" if len(recent_images) > 1: final_result += f"\n📈 **Total new visualizations:** {len(recent_images)}" return final_result, gr.Image(value=latest_image, visible=True) else: return final_result, gr.Image(visible=False) except Exception as e: progress(1.0, desc="❌ Error in follow-up analysis") return f"Error: {str(e)}", gr.Image(visible=False) # Create the Gradio interface with gr.Blocks(title="🤖 French Public Data Analysis Agent", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1200px !important; margin: auto; width: 100% !important; } .main-header { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: 0 8px 32px rgba(0,0,0,0.1); } .accordion-content { overflow: hidden !important; width: 100% !important; } .gr-accordion { width: 100% !important; max-width: 100% !important; } .gr-accordion .gr-row { width: 100% !important; max-width: 100% !important; margin: 0 !important; } .gr-accordion .gr-column { min-width: 0 !important; flex: 1 !important; max-width: 50% !important; padding-right: 1rem !important; } .gr-accordion .gr-column:last-child { padding-right: 0 !important; padding-left: 1rem !important; } """) as demo: # Main header with better styling gr.HTML("""
Intelligent analysis of French public datasets with AI-powered insights
🌐 Search in French or English • 🤖 AI Agent finds & analyzes datasets • 🗺️ Generate Reports with visualizations
Initial search results guide the agent, but it can search for different datasets if needed
Click any example below to get started
Automatically generated charts and maps will appear below
Ask about report findings, request data analysis, or get contextual information
Click any example below to try it out