axel-darmouni's picture
update
2508004
raw
history blame
25.1 kB
import os
import pandas as pd
import gradio as gr
import glob
import threading
import time
import queue
import numpy as np
from rank_bm25 import BM25Okapi
import re
from dotenv import load_dotenv
from smolagents import CodeAgent, LiteLLMModel
from agent import create_web_agent, generate_prompt
from unidecode import unidecode
load_dotenv()
# Global variables for progress tracking
progress_queue = queue.Queue()
current_status = ""
# Initialize LLM translator and BM25
llm_translator = None
bm25_model = None
precomputed_titles = None
def initialize_models():
"""Initialize the LLM translator and BM25 model"""
global llm_translator, bm25_model, precomputed_titles
if llm_translator is None:
# Initialize LLM for translation
try:
model = LiteLLMModel(
model_id="gemini/gemini-2.5-flash-preview-05-20",
api_key=os.getenv("GEMINI_API_KEY")
)
llm_translator = CodeAgent(tools=[], model=model, max_steps=1)
print("βœ… LLM translator initialized")
except Exception as e:
print(f"⚠️ Error initializing LLM translator: {e}")
# Load pre-computed BM25 model if available
if bm25_model is None:
try:
import pickle
with open('bm25_data.pkl', 'rb') as f:
bm25_data = pickle.load(f)
bm25_model = bm25_data['bm25_model']
precomputed_titles = bm25_data['titles']
print(f"βœ… Loaded pre-computed BM25 model for {len(precomputed_titles)} datasets")
except FileNotFoundError:
print("⚠️ Pre-computed BM25 model not found. Will compute at runtime.")
except Exception as e:
print(f"⚠️ Error loading pre-computed BM25 model: {e}")
print("Will compute BM25 at runtime.")
def translate_query_llm(query, target_lang='fr'):
"""Translate query using LLM"""
global llm_translator
if llm_translator is None:
initialize_models()
if llm_translator is None:
print("⚠️ LLM translator not available, returning original query")
return query, 'unknown'
try:
# Create translation prompt
if target_lang == 'fr':
target_language = "French"
elif target_lang == 'en':
target_language = "English"
else:
target_language = target_lang
translation_prompt = f"""
Translate the following text to {target_language}.
If the text is already in {target_language}, return it as is.
Only return the translated text, nothing else.
Text to translate: "{query}"
"""
# Get translation from LLM
response = llm_translator.run(translation_prompt)
translated_text = str(response).strip().strip('"').strip("'")
# Simple language detection
if query.lower() == translated_text.lower():
source_lang = target_lang
else:
source_lang = 'en' if target_lang == 'fr' else 'fr'
return translated_text, source_lang
except Exception as e:
print(f"LLM translation error: {e}")
return query, 'unknown'
def simple_keyword_preprocessing(text):
"""Simple preprocessing for keyword matching - handles case, accents and basic plurals"""
# Convert to lowercase and remove accents
text = unidecode(str(text).lower())
# Basic plural handling - just remove trailing 's' and 'x'
words = text.split()
processed_words = []
for word in words:
# Remove common plural endings
if word.endswith('s') and len(word) > 3 and not word.endswith('ss'):
word = word[:-1]
elif word.endswith('x') and len(word) > 3:
word = word[:-1]
processed_words.append(word)
return processed_words
def find_similar_dataset_bm25(query, df):
"""Find the most similar dataset using BM25 keyword matching"""
global bm25_model, precomputed_titles
# Translate query to French for better matching with French datasets
translated_query, original_lang = translate_query_llm(query, target_lang='fr')
# Combine original and translated queries for search
search_queries = [query, translated_query] if query != translated_query else [query]
# Get dataset titles
dataset_titles = df['title'].fillna('').tolist()
# Use pre-computed BM25 model if available and matches current dataset
if (bm25_model is not None and precomputed_titles is not None and
len(dataset_titles) == len(precomputed_titles) and dataset_titles == precomputed_titles):
print("πŸš€ Using pre-computed BM25 model for fast matching")
bm25 = bm25_model
else:
# Build BM25 model at runtime
print("⚠️ Computing BM25 model at runtime...")
# Preprocess all dataset titles into tokenized form
processed_titles = [simple_keyword_preprocessing(title) for title in dataset_titles]
bm25 = BM25Okapi(processed_titles)
best_score = -1
best_idx = 0
for search_query in search_queries:
try:
# Preprocess the search query
processed_query = simple_keyword_preprocessing(search_query)
# Get BM25 scores for all documents
scores = bm25.get_scores(processed_query)
max_score = scores.max()
max_idx = scores.argmax()
if max_score > best_score:
best_score = max_score
best_idx = max_idx
except Exception as e:
print(f"Error processing query '{search_query}': {e}")
continue
# Show top 5 matches for comparison
if len(search_queries) > 0:
processed_query = simple_keyword_preprocessing(search_queries[0])
scores = bm25.get_scores(processed_query)
return best_idx, best_score, translated_query, original_lang
def create_progress_callback():
"""Create a callback function for tracking agent progress"""
def progress_callback(memory_step, agent=None):
"""Callback function called at each agent step"""
step_number = memory_step.step_number
# Extract information about the current step
if hasattr(memory_step, 'action_input') and memory_step.action_input:
action_content = memory_step.action_input
elif hasattr(memory_step, 'action_output') and memory_step.action_output:
action_content = str(memory_step.action_output)
else:
action_content = ""
# Define progress based on step content and number
progress_val = min(0.1 + (step_number * 0.03), 0.95) # Progressive increase
# Analyze the step content to provide meaningful status
action_lower = action_content.lower() if action_content else ""
if "visit_webpage" in action_lower or "examining" in action_lower:
description = f"πŸ” Step {step_number}: Examining webpage..."
elif "get_all_links" in action_lower or "links" in action_lower:
description = f"πŸ”— Step {step_number}: Extracting data links..."
elif "read_file_from_url" in action_lower or "reading" in action_lower:
description = f"πŸ“Š Step {step_number}: Loading dataset..."
elif "get_dataset_description" in action_lower or "description" in action_lower:
description = f"πŸ“‹ Step {step_number}: Analyzing dataset structure..."
elif "department" in action_lower or "region" in action_lower:
description = f"πŸ—ΊοΈ Step {step_number}: Processing geographic data..."
elif "plot" in action_lower or "map" in action_lower or "france" in action_lower:
description = f"πŸ—ΊοΈ Step {step_number}: Creating France map..."
elif "visualization" in action_lower or "chart" in action_lower:
description = f"πŸ“ˆ Step {step_number}: Generating visualizations..."
elif "save" in action_lower or "png" in action_lower:
description = f"πŸ’Ύ Step {step_number}: Saving visualizations..."
elif "pdf" in action_lower or "report" in action_lower:
description = f"πŸ“„ Step {step_number}: Creating PDF report..."
elif hasattr(memory_step, 'error') and memory_step.error:
description = f"⚠️ Step {step_number}: Handling error..."
else:
description = f"πŸ€– Step {step_number}: Processing..."
# Check if this is the final step
if hasattr(memory_step, 'action_output') and memory_step.action_output and "final" in action_lower:
progress_val = 1.0
description = "βœ… Analysis complete!"
# Put the progress update in the queue
try:
progress_queue.put((progress_val, description))
except:
pass
return progress_callback
def run_agent_analysis_with_progress(query, progress_callback, df=None, page_url_callback=None, data_gouv_page=None, most_similar_idx=None):
"""
Run the agent analysis with progress tracking using smolagents callbacks.
"""
try:
# Clean up previous results
if os.path.exists('generated_data'):
for file in glob.glob('generated_data/*'):
try:
os.remove(file)
except:
pass
else:
os.makedirs('generated_data', exist_ok=True)
# If dataset info not provided, find it (fallback)
if data_gouv_page is None or most_similar_idx is None:
progress_callback(0.02, "πŸ€– Initializing LLM translator and BM25...")
initialize_models()
progress_callback(0.05, "πŸ” Searching for relevant datasets (using BM25 keyword matching)...")
# Read the filtered dataset if not provided
if df is None:
df = pd.read_csv('filtered_dataset.csv')
# Find the most similar dataset using BM25 keyword matching
most_similar_idx, similarity_score, translated_query, original_lang = find_similar_dataset_bm25(query, df)
data_gouv_page = df.iloc[most_similar_idx]['url']
# Immediately show the page URL via callback
if page_url_callback:
page_url_callback(data_gouv_page)
progress_callback(0.08, "πŸ€– Initializing agent...")
else:
# Dataset already found, continue from where we left off
progress_callback(0.09, "πŸ€– Initializing agent...")
step_callback = create_progress_callback()
progress_callback(0.1, "πŸ€– Starting agent analysis...")
# Create the agent with progress callback
web_agent = create_web_agent(step_callback)
prompt = generate_prompt(data_gouv_page)
# Run the agent - the step_callbacks will automatically update progress
answer = web_agent.run(prompt)
# Check if the agent found no processable data
answer_lower = str(answer).lower() if answer else ""
if ("no processable data" in answer_lower or
"no csv nor json" in answer_lower or
"cannot find csv" in answer_lower or
"cannot find json" in answer_lower or
"no data to process" in answer_lower):
progress_callback(1.0, "❌ No CSV/JSON files found in the dataset")
return "❌ No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], data_gouv_page
# Check if files were generated
generated_files = glob.glob('generated_data/*')
if generated_files:
progress_callback(1.0, "βœ… Analysis completed successfully!")
return "Analysis completed successfully!", generated_files, data_gouv_page
else:
progress_callback(1.0, "⚠️ Analysis completed but no files were generated.")
return "Analysis completed but no files were generated.", [], data_gouv_page
except Exception as e:
progress_callback(1.0, f"❌ Error: {str(e)}")
return f"Error during analysis: {str(e)}", [], None
def search_and_analyze(query, progress=gr.Progress()):
"""
Main function called when search button is clicked.
Uses Gradio's progress bar for visual feedback.
"""
# Read the filtered dataset first
df = pd.read_csv('filtered_dataset.csv')
# If no query provided, randomly select one weighted by quality score
if not query.strip():
progress(0, desc="🎲 No query provided - selecting random high-quality dataset...")
# Use quality_score as weights for random selection
if 'quality_score' in df.columns:
# Ensure quality scores are positive for weighting
weights = df['quality_score'].fillna(0)
weights = weights - weights.min() + 0.1 # Shift to make all positive
else:
weights = None
# Randomly sample one dataset weighted by quality
selected_row = df.sample(n=1, weights=weights).iloc[0]
query = selected_row['title']
progress(0.02, f"🎯 Random selection: {query[:60]}...")
# Clear the progress queue
while not progress_queue.empty():
try:
progress_queue.get_nowait()
except queue.Empty:
break
# Initialize outputs
pdf_file = None
images_output = [gr.Image(visible=False)] * 4
status = "πŸš€ Starting analysis..."
# Initial progress
progress(0.05, desc="πŸš€ Initializing...")
def progress_callback(progress_val, description):
"""Callback function to update progress - puts updates in queue"""
try:
progress_queue.put((progress_val, description))
except:
pass
# Run analysis in a separate thread
result_queue = queue.Queue()
# Store the page URL to show immediately (kept for compatibility)
page_url_to_show = None
def page_url_callback(url):
nonlocal page_url_to_show
page_url_to_show = url
# Find and show the page URL immediately FIRST
initialize_models()
progress(0.06, desc="πŸ” Finding relevant dataset...")
most_similar_idx, similarity_score, translated_query, original_lang = find_similar_dataset_bm25(query, df)
data_gouv_page = df.iloc[most_similar_idx]['url']
dataset_title = df.iloc[most_similar_idx]['title']
progress(0.07, desc=f"πŸ“‹ Found dataset: {dataset_title[:50]}...")
# Now start the analysis thread with the found dataset info
def run_analysis():
try:
# Pass the already found dataset info to the analysis function
result = run_agent_analysis_with_progress(query, progress_callback, df, page_url_callback, data_gouv_page, most_similar_idx)
result_queue.put(result)
except Exception as e:
result_queue.put((f"Error: {str(e)}", [], data_gouv_page))
analysis_thread = threading.Thread(target=run_analysis)
analysis_thread.start()
# Show page URL immediately by returning current state
current_page_display = gr.Textbox(value=data_gouv_page, visible=True)
current_status = "πŸ”— Page found - starting analysis..."
# Initial update to show the page URL immediately
progress(0.08, desc="πŸ”— Page found - starting analysis...")
# Monitor progress while analysis runs
last_progress = 0.08
while analysis_thread.is_alive() or not result_queue.empty():
try:
# Check for progress updates from queue
try:
progress_val, description = progress_queue.get(timeout=0.1)
if progress_val > last_progress:
last_progress = progress_val
current_status = description
progress(progress_val, desc=description)
except queue.Empty:
pass
# Check if analysis is complete
try:
final_status, files, page_url = result_queue.get(timeout=0.1)
# Check if this is a "no data" case
if "❌ No CSV/JSON files found" in final_status:
progress(1.0, desc="❌ No processable data found")
return (gr.Textbox(value=page_url if page_url else data_gouv_page, visible=True),
final_status,
gr.File(visible=False),
gr.Image(visible=False), gr.Image(visible=False),
gr.Image(visible=False), gr.Image(visible=False))
# Final progress update
progress(1.0, desc="βœ… Processing results...")
# Process results
pdf_file = None
png_files = []
for file in files:
if file.endswith('.pdf'):
pdf_file = file
elif file.endswith('.png'):
png_files.append(file)
# Prepare final outputs
download_button = gr.File(value=pdf_file, visible=True) if pdf_file else None
# Prepare images for display (up to 4 images)
images = []
for i in range(4):
if i < len(png_files):
images.append(gr.Image(value=png_files[i], visible=True))
else:
images.append(gr.Image(visible=False))
# final progress completion
progress(1.0, desc="πŸŽ‰ Complete!")
return gr.Textbox(value=page_url if page_url else data_gouv_page, visible=True), final_status, download_button, *images
except queue.Empty:
pass
time.sleep(0.5) # Small delay to prevent excessive updates
except Exception as e:
progress(1.0, desc=f"❌ Error: {str(e)}")
return gr.Textbox(value=data_gouv_page, visible=True), f"❌ Error: {str(e)}", None, *images_output
# Ensure thread completes
analysis_thread.join(timeout=1)
# Fallback return
progress(1.0, desc="🏁 Finished")
return gr.Textbox(value=data_gouv_page, visible=True), current_status, pdf_file, *images_output
# Create the Gradio interface
with gr.Blocks(title="πŸ€– French Public Data Analysis Agent", theme=gr.themes.Soft(), css="""
.gradio-container {
max-width: 1200px !important;
margin: auto;
width: 100% !important;
}
.main-header {
text-align: center;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 2rem;
border-radius: 15px;
margin-bottom: 2rem;
box-shadow: 0 8px 32px rgba(0,0,0,0.1);
}
.accordion-content {
overflow: hidden !important;
width: 100% !important;
}
.gr-accordion {
width: 100% !important;
max-width: 100% !important;
}
.gr-accordion .gr-row {
width: 100% !important;
max-width: 100% !important;
margin: 0 !important;
}
.gr-accordion .gr-column {
min-width: 0 !important;
flex: 1 !important;
max-width: 50% !important;
padding-right: 1rem !important;
}
.gr-accordion .gr-column:last-child {
padding-right: 0 !important;
padding-left: 1rem !important;
}
""") as demo:
# Main header with better styling
gr.HTML("""
<div class="main-header">
<h1 style="margin: 0; font-size: 2.5rem; font-weight: bold;">
πŸ€– French Public Data Analysis Agent
</h1>
<p style="font-size: 1.2rem; opacity: 0.9;">
Intelligent analysis of French public datasets with AI-powered insights
</p>
</div>
""")
# What this agent does
gr.HTML("""
<div style="text-align: center; background: #f8fafc; padding: 1.5rem; border-radius: 10px; margin: 1rem 0;">
<p style="font-size: 1.1rem; color: #374151; margin: 0;">
🌐 <strong>Search in French or English</strong> β€’ πŸ—ΊοΈ <strong>Generate Reports with visualizations from the data</strong>
</p>
</div>
""")
# Tips & Information accordion - moved to the top
with gr.Accordion("πŸ’‘ Tips & Information", open=False):
with gr.Row():
with gr.Column():
gr.Markdown("""
🎯 **How to Use:**
- Enter any search term related to French public data
- Leave empty to randomly select a high-quality dataset
- Results include visualizations and downloadable reports
⏱️ **Processing Time:**
- Report generation takes 5-10 minutes depending on dataset complexity
- Larger datasets may require additional processing time
""")
with gr.Column():
gr.Markdown("""
⚠️ **Important Notes:**
- Still a work in progress, might be better to start with the example queries
- Some datasets may not contain processable CSV/JSON files
- All visualizations are automatically generated
- Maps focus on France when geographic data is available
🌐 **Language Support:**
- Search in French or English - queries are automatically translated
""")
with gr.Row():
query_input = gr.Textbox(
label="Search Query",
placeholder="e.g., road traffic accidents, education, housing (or leave empty for random selection)",
scale=4
)
search_button = gr.Button(
"πŸš€ Analyze Dataset",
variant="primary",
scale=1,
size="lg"
)
# Quick Start Examples row
with gr.Row():
gr.HTML("""
<div>
<h3 style="color: #374151">πŸš€ Quick Start Examples</h3>
<p style="color: #6b7280">Click any example below to get started</p>
</div>
""")
with gr.Row():
examples = [
("πŸš— Road Traffic Accidents 2005 - 2023", "road traffic accidents 2005 - 2023"),
("πŸŽ“ Education Directory", "education directory"),
("🏠 French Vacant Housing Private Park", "French vacant housing private park"),
]
for emoji_text, query_text in examples:
gr.Button(
emoji_text,
variant="secondary",
size="sm"
).click(
lambda x=query_text: x,
outputs=query_input
)
# Page info and analysis status with progress bar
with gr.Group():
page_url_display = gr.Textbox(label="πŸ”— Page Started On", interactive=False, visible=False)
with gr.Row():
status_output = gr.Textbox(label="πŸ“Š Analysis Status", interactive=False, scale=1)
# Download section
with gr.Row():
download_button = gr.File(
label="πŸ“„ Download PDF Report",
visible=False
)
gr.Markdown("---")
gr.HTML("""
<div style="text-align: center; margin: 2rem 0;">
<h2 style="color: #374151; margin-bottom: 0.5rem;">πŸ“Š Generated Visualizations</h2>
<p style="color: #6b7280; margin: 0;">Automatically generated charts and maps will appear below</p>
</div>
""")
with gr.Row():
with gr.Column():
image1 = gr.Image(label="πŸ“ˆ Chart 1", visible=False, height=400)
image2 = gr.Image(label="πŸ“Š Chart 2", visible=False, height=400)
with gr.Column():
image3 = gr.Image(label="πŸ—ΊοΈ Map/Chart 3", visible=False, height=400)
image4 = gr.Image(label="πŸ“‰ Chart 4", visible=False, height=400)
# Set up the search button click event with progress bar
search_button.click(
fn=search_and_analyze,
inputs=[query_input],
outputs=[page_url_display, status_output, download_button, image1, image2, image3, image4],
show_progress="full" # Show the built-in progress bar
)
if __name__ == "__main__":
demo.queue() # Enable queuing for real-time updates
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)