|
import os |
|
import pandas as pd |
|
import gradio as gr |
|
import glob |
|
import threading |
|
import time |
|
import queue |
|
import numpy as np |
|
from rank_bm25 import BM25Okapi |
|
from dotenv import load_dotenv |
|
from smolagents import CodeAgent, LiteLLMModel |
|
from agent import create_web_agent, generate_prompt |
|
from followup_agent import run_followup_analysis |
|
from unidecode import unidecode |
|
|
|
load_dotenv() |
|
|
|
|
|
progress_queue = queue.Queue() |
|
current_status = "" |
|
|
|
|
|
llm_translator = None |
|
bm25_model = None |
|
precomputed_titles = None |
|
|
|
def initialize_models(): |
|
"""Initialize the LLM translator and BM25 model""" |
|
global llm_translator, bm25_model, precomputed_titles |
|
|
|
if llm_translator is None: |
|
|
|
try: |
|
model = LiteLLMModel( |
|
model_id="gemini/gemini-2.5-flash-preview-05-20", |
|
api_key=os.getenv("GEMINI_API_KEY") |
|
) |
|
llm_translator = CodeAgent(tools=[], model=model, max_steps=1) |
|
print("β
LLM translator initialized") |
|
except Exception as e: |
|
print(f"β οΈ Error initializing LLM translator: {e}") |
|
|
|
|
|
if bm25_model is None: |
|
try: |
|
import pickle |
|
with open('bm25_data.pkl', 'rb') as f: |
|
bm25_data = pickle.load(f) |
|
bm25_model = bm25_data['bm25_model'] |
|
precomputed_titles = bm25_data['titles'] |
|
print(f"β
Loaded pre-computed BM25 model for {len(precomputed_titles)} datasets") |
|
except FileNotFoundError: |
|
print("β οΈ Pre-computed BM25 model not found. Will compute at runtime.") |
|
except Exception as e: |
|
print(f"β οΈ Error loading pre-computed BM25 model: {e}") |
|
print("Will compute BM25 at runtime.") |
|
|
|
def translate_query_llm(query, target_lang='fr'): |
|
"""Translate query using LLM""" |
|
global llm_translator |
|
|
|
if llm_translator is None: |
|
initialize_models() |
|
|
|
if llm_translator is None: |
|
print("β οΈ LLM translator not available, returning original query") |
|
return query, 'unknown' |
|
|
|
try: |
|
|
|
if target_lang == 'fr': |
|
target_language = "French" |
|
elif target_lang == 'en': |
|
target_language = "English" |
|
else: |
|
target_language = target_lang |
|
|
|
translation_prompt = f""" |
|
Translate the following text to {target_language}. |
|
If the text is already in {target_language}, return it as is. |
|
Only return the translated text, nothing else. |
|
|
|
Text to translate: "{query}" |
|
""" |
|
|
|
|
|
response = llm_translator.run(translation_prompt) |
|
translated_text = str(response).strip().strip('"').strip("'") |
|
|
|
|
|
if query.lower() == translated_text.lower(): |
|
source_lang = target_lang |
|
else: |
|
source_lang = 'en' if target_lang == 'fr' else 'fr' |
|
|
|
return translated_text, source_lang |
|
|
|
except Exception as e: |
|
print(f"LLM translation error: {e}") |
|
return query, 'unknown' |
|
|
|
def simple_keyword_preprocessing(text): |
|
"""Simple preprocessing for keyword matching - handles case, accents and basic plurals""" |
|
|
|
text = unidecode(str(text).lower()) |
|
|
|
|
|
words = text.split() |
|
processed_words = [] |
|
|
|
for word in words: |
|
|
|
if word.endswith('s') and len(word) > 3 and not word.endswith('ss'): |
|
word = word[:-1] |
|
elif word.endswith('x') and len(word) > 3: |
|
word = word[:-1] |
|
processed_words.append(word) |
|
|
|
return processed_words |
|
|
|
def find_similar_dataset_bm25(query, df): |
|
"""Find the most similar dataset using BM25 keyword matching""" |
|
global bm25_model, precomputed_titles |
|
|
|
|
|
translated_query, original_lang = translate_query_llm(query, target_lang='fr') |
|
|
|
|
|
search_queries = [query, translated_query] if query != translated_query else [query] |
|
|
|
|
|
dataset_titles = df['title'].fillna('').tolist() |
|
|
|
|
|
if (bm25_model is not None and precomputed_titles is not None and |
|
len(dataset_titles) == len(precomputed_titles) and dataset_titles == precomputed_titles): |
|
print("π Using pre-computed BM25 model for fast matching") |
|
bm25 = bm25_model |
|
else: |
|
|
|
print("β οΈ Computing BM25 model at runtime...") |
|
|
|
processed_titles = [simple_keyword_preprocessing(title) for title in dataset_titles] |
|
bm25 = BM25Okapi(processed_titles) |
|
|
|
best_score = -1 |
|
best_idx = 0 |
|
|
|
for search_query in search_queries: |
|
try: |
|
|
|
processed_query = simple_keyword_preprocessing(search_query) |
|
|
|
|
|
scores = bm25.get_scores(processed_query) |
|
|
|
max_score = scores.max() |
|
max_idx = scores.argmax() |
|
if max_score > best_score: |
|
best_score = max_score |
|
best_idx = max_idx |
|
except Exception as e: |
|
print(f"Error processing query '{search_query}': {e}") |
|
continue |
|
|
|
|
|
if len(search_queries) > 0: |
|
processed_query = simple_keyword_preprocessing(search_queries[0]) |
|
scores = bm25.get_scores(processed_query) |
|
return best_idx, best_score, translated_query, original_lang |
|
|
|
def create_progress_callback(): |
|
"""Create a callback function for tracking agent progress""" |
|
|
|
def progress_callback(memory_step, agent=None): |
|
"""Callback function called at each agent step""" |
|
step_number = memory_step.step_number |
|
|
|
|
|
if hasattr(memory_step, 'action_input') and memory_step.action_input: |
|
action_content = memory_step.action_input |
|
elif hasattr(memory_step, 'action_output') and memory_step.action_output: |
|
action_content = str(memory_step.action_output) |
|
else: |
|
action_content = "" |
|
|
|
|
|
progress_val = min(0.1 + (step_number * 0.03), 0.95) |
|
|
|
|
|
action_lower = action_content.lower() if action_content else "" |
|
|
|
if "visit_webpage" in action_lower or "examining" in action_lower: |
|
description = f"π Step {step_number}: Examining webpage..." |
|
elif "get_all_links" in action_lower or "links" in action_lower: |
|
description = f"π Step {step_number}: Extracting data links..." |
|
elif "read_file_from_url" in action_lower or "reading" in action_lower: |
|
description = f"π Step {step_number}: Loading dataset..." |
|
elif "get_dataset_description" in action_lower or "description" in action_lower: |
|
description = f"π Step {step_number}: Analyzing dataset structure..." |
|
elif "department" in action_lower or "region" in action_lower: |
|
description = f"πΊοΈ Step {step_number}: Processing geographic data..." |
|
elif "plot" in action_lower or "map" in action_lower or "france" in action_lower: |
|
description = f"πΊοΈ Step {step_number}: Creating France map..." |
|
elif "visualization" in action_lower or "chart" in action_lower: |
|
description = f"π Step {step_number}: Generating visualizations..." |
|
elif "save" in action_lower or "png" in action_lower: |
|
description = f"πΎ Step {step_number}: Saving visualizations..." |
|
elif "docx" in action_lower or "report" in action_lower: |
|
description = f"π Step {step_number}: Creating DOCX report..." |
|
elif hasattr(memory_step, 'error') and memory_step.error: |
|
description = f"β οΈ Step {step_number}: Handling error..." |
|
else: |
|
description = f"π€ Step {step_number}: Processing..." |
|
|
|
|
|
if hasattr(memory_step, 'action_output') and memory_step.action_output and "final" in action_lower: |
|
progress_val = 1.0 |
|
description = "β
Analysis complete!" |
|
|
|
|
|
try: |
|
progress_queue.put((progress_val, description)) |
|
except: |
|
pass |
|
|
|
return progress_callback |
|
|
|
def run_agent_analysis_with_progress(query, progress_callback, df=None, page_url_callback=None, data_gouv_page=None, most_similar_idx=None): |
|
""" |
|
Run the agent analysis with progress tracking using smolagents callbacks. |
|
""" |
|
try: |
|
|
|
if os.path.exists('generated_data'): |
|
for file in glob.glob('generated_data/*'): |
|
try: |
|
os.remove(file) |
|
except: |
|
pass |
|
else: |
|
os.makedirs('generated_data', exist_ok=True) |
|
|
|
|
|
if data_gouv_page is None or most_similar_idx is None: |
|
progress_callback(0.02, "π€ Initializing LLM translator and BM25...") |
|
initialize_models() |
|
|
|
progress_callback(0.05, "π Searching for relevant datasets (using BM25 keyword matching)...") |
|
|
|
|
|
if df is None: |
|
df = pd.read_csv('filtered_dataset.csv') |
|
|
|
|
|
most_similar_idx, similarity_score, translated_query, original_lang = find_similar_dataset_bm25(query, df) |
|
data_gouv_page = df.iloc[most_similar_idx]['url'] |
|
|
|
|
|
if page_url_callback: |
|
page_url_callback(data_gouv_page) |
|
|
|
progress_callback(0.08, "π€ Initializing agent...") |
|
else: |
|
|
|
progress_callback(0.09, "π€ Initializing agent...") |
|
|
|
step_callback = create_progress_callback() |
|
|
|
progress_callback(0.1, "π€ Starting agent analysis...") |
|
|
|
|
|
web_agent = create_web_agent(step_callback) |
|
prompt = generate_prompt(data_gouv_page) |
|
|
|
|
|
answer = web_agent.run(prompt) |
|
|
|
|
|
answer_lower = str(answer).lower() if answer else "" |
|
if ("no processable data" in answer_lower or |
|
"no csv nor json" in answer_lower or |
|
"cannot find csv" in answer_lower or |
|
"cannot find json" in answer_lower or |
|
"no data to process" in answer_lower): |
|
progress_callback(1.0, "β No CSV/JSON files found in the dataset") |
|
return "β No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], data_gouv_page |
|
|
|
|
|
generated_files = glob.glob('generated_data/*') |
|
|
|
if generated_files: |
|
progress_callback(1.0, "β
Analysis completed successfully!") |
|
return "Analysis completed successfully!", generated_files, data_gouv_page |
|
else: |
|
progress_callback(1.0, "β οΈ Analysis completed but no files were generated.") |
|
return "Analysis completed but no files were generated.", [], data_gouv_page |
|
|
|
except Exception as e: |
|
progress_callback(1.0, f"β Error: {str(e)}") |
|
return f"Error during analysis: {str(e)}", [], None |
|
|
|
def search_and_analyze(query, progress=gr.Progress()): |
|
""" |
|
Unified function that does initial search then lets agent analyze with full autonomy. |
|
Uses Gradio's progress bar for visual feedback. |
|
""" |
|
|
|
while not progress_queue.empty(): |
|
try: |
|
progress_queue.get_nowait() |
|
except queue.Empty: |
|
break |
|
|
|
|
|
docx_file = None |
|
images_output = [gr.Image(visible=False)] * 4 |
|
status = "π Starting agent-driven analysis..." |
|
|
|
|
|
progress(0.05, desc="π Initializing agent...") |
|
|
|
def progress_callback(progress_val, description): |
|
"""Callback function to update progress - puts updates in queue""" |
|
try: |
|
progress_queue.put((progress_val, description)) |
|
except: |
|
pass |
|
|
|
|
|
result_queue = queue.Queue() |
|
|
|
def run_analysis(): |
|
try: |
|
|
|
if os.path.exists('generated_data'): |
|
for file in glob.glob('generated_data/*'): |
|
try: |
|
os.remove(file) |
|
except: |
|
pass |
|
else: |
|
os.makedirs('generated_data', exist_ok=True) |
|
|
|
|
|
initial_search_results = None |
|
if query.strip(): |
|
progress_callback(0.06, f"π Initial search for: {query[:50]}...") |
|
try: |
|
|
|
from tools.retrieval_tools import search_datasets |
|
initial_search_results = search_datasets(query, top_k=5) |
|
progress_callback(0.08, "π€ Starting agent with search results...") |
|
except Exception as e: |
|
print(f"Initial search failed: {e}") |
|
progress_callback(0.08, "π€ Starting agent without initial results...") |
|
else: |
|
progress_callback(0.08, "π€ Starting agent for random selection...") |
|
|
|
step_callback = create_progress_callback() |
|
|
|
|
|
web_agent = create_web_agent(step_callback) |
|
|
|
|
|
prompt = generate_prompt(user_query=query, initial_search_results=initial_search_results) |
|
progress_callback(0.1, "π€ Agent analyzing datasets...") |
|
|
|
|
|
answer = web_agent.run(prompt) |
|
|
|
|
|
answer_lower = str(answer).lower() if answer else "" |
|
if ("no processable data" in answer_lower or |
|
"no csv nor json" in answer_lower or |
|
"cannot find csv" in answer_lower or |
|
"cannot find json" in answer_lower or |
|
"no data to process" in answer_lower): |
|
progress_callback(1.0, "β No CSV/JSON files found in the dataset") |
|
result_queue.put(("β No CSV/JSON files found in the selected dataset. This dataset cannot be processed automatically.", [], None)) |
|
return |
|
|
|
|
|
generated_files = glob.glob('generated_data/*') |
|
|
|
if generated_files: |
|
progress_callback(1.0, "β
Analysis completed successfully!") |
|
result_queue.put(("Analysis completed successfully!", generated_files, "Agent-selected dataset")) |
|
else: |
|
progress_callback(1.0, "β οΈ Analysis completed but no files were generated.") |
|
result_queue.put(("Analysis completed but no files were generated.", [], None)) |
|
|
|
except Exception as e: |
|
progress_callback(1.0, f"β Error: {str(e)}") |
|
result_queue.put((f"Error during analysis: {str(e)}", [], None)) |
|
|
|
analysis_thread = threading.Thread(target=run_analysis) |
|
analysis_thread.start() |
|
|
|
|
|
current_status = "π€ Agent is finding relevant datasets..." |
|
progress(0.08, desc=current_status) |
|
|
|
|
|
last_progress = 0.08 |
|
|
|
while analysis_thread.is_alive() or not result_queue.empty(): |
|
try: |
|
|
|
try: |
|
progress_val, description = progress_queue.get(timeout=0.1) |
|
if progress_val > last_progress: |
|
last_progress = progress_val |
|
current_status = description |
|
progress(progress_val, desc=description) |
|
except queue.Empty: |
|
pass |
|
|
|
|
|
try: |
|
final_status, files, page_url = result_queue.get(timeout=0.1) |
|
|
|
|
|
if "β No CSV/JSON files found" in final_status: |
|
progress(1.0, desc="β No processable data found") |
|
return (gr.Textbox(value="Agent-selected dataset", visible=True), |
|
final_status, |
|
gr.File(visible=False), |
|
gr.Image(visible=False), gr.Image(visible=False), |
|
gr.Image(visible=False), gr.Image(visible=False), |
|
gr.Markdown(visible=False), |
|
gr.HTML(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False)) |
|
|
|
|
|
progress(1.0, desc="β
Processing results...") |
|
|
|
|
|
docx_file = None |
|
png_files = [] |
|
|
|
for file in files: |
|
if file.endswith('.docx'): |
|
docx_file = file |
|
elif file.endswith('.png'): |
|
png_files.append(file) |
|
|
|
|
|
download_button = gr.File(value=docx_file, visible=True) if docx_file else None |
|
|
|
|
|
images = [] |
|
for i in range(4): |
|
if i < len(png_files): |
|
images.append(gr.Image(value=png_files[i], visible=True)) |
|
else: |
|
images.append(gr.Image(visible=False)) |
|
|
|
|
|
progress(1.0, desc="π Complete!") |
|
|
|
|
|
return (gr.Textbox(value=page_url if page_url else "Agent-selected dataset", visible=True), |
|
final_status, download_button, *images, |
|
gr.Markdown(visible=True), |
|
gr.HTML(visible=True), |
|
gr.Row(visible=True), |
|
gr.Row(visible=True), |
|
gr.Row(visible=True), |
|
gr.Row(visible=True), |
|
gr.Row(visible=True)) |
|
|
|
except queue.Empty: |
|
pass |
|
|
|
time.sleep(0.5) |
|
|
|
except Exception as e: |
|
progress(1.0, desc=f"β Error: {str(e)}") |
|
return (gr.Textbox(value="Error", visible=True), f"β Error: {str(e)}", None, *images_output, |
|
gr.Markdown(visible=False), |
|
gr.HTML(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False)) |
|
|
|
|
|
analysis_thread.join(timeout=1) |
|
|
|
|
|
progress(1.0, desc="π Finished") |
|
return (gr.Textbox(value="Completed", visible=True), current_status, docx_file, *images_output, |
|
gr.Markdown(visible=False), |
|
gr.HTML(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False), |
|
gr.Row(visible=False)) |
|
|
|
def run_followup_question(question, progress=gr.Progress()): |
|
""" |
|
Run a follow-up analysis based on user's question about the previous report. |
|
""" |
|
if not question.strip(): |
|
return "Please enter a follow-up question.", gr.Image(visible=False) |
|
|
|
progress(0.1, desc="π€ Starting follow-up analysis...") |
|
|
|
try: |
|
|
|
if not os.path.exists('generated_data') or not os.listdir('generated_data'): |
|
return "No previous analysis found. Please run an analysis first.", gr.Image(visible=False) |
|
|
|
progress(0.3, desc="π Analyzing previous report and dataset...") |
|
|
|
|
|
result = run_followup_analysis(question) |
|
|
|
progress(0.9, desc="π Processing results...") |
|
|
|
|
|
import glob |
|
|
|
|
|
all_images = glob.glob('generated_data/*.png') |
|
|
|
|
|
import time |
|
current_time = time.time() |
|
recent_images = [] |
|
|
|
for img_path in all_images: |
|
img_time = os.path.getctime(img_path) |
|
if current_time - img_time < 120: |
|
recent_images.append(img_path) |
|
|
|
|
|
latest_image = None |
|
if recent_images: |
|
latest_image = max(recent_images, key=os.path.getctime) |
|
|
|
progress(1.0, desc="β
Follow-up analysis complete!") |
|
|
|
|
|
final_result = result |
|
if latest_image: |
|
final_result += f"\n\nπ **Visualization Created:** {os.path.basename(latest_image)}" |
|
if len(recent_images) > 1: |
|
final_result += f"\nπ **Total new visualizations:** {len(recent_images)}" |
|
return final_result, gr.Image(value=latest_image, visible=True) |
|
else: |
|
return final_result, gr.Image(visible=False) |
|
|
|
except Exception as e: |
|
progress(1.0, desc="β Error in follow-up analysis") |
|
return f"Error: {str(e)}", gr.Image(visible=False) |
|
|
|
|
|
with gr.Blocks(title="π€ French Public Data Analysis Agent", theme=gr.themes.Soft(), css=""" |
|
.gradio-container { |
|
max-width: 1200px !important; |
|
margin: auto; |
|
width: 100% !important; |
|
} |
|
.main-header { |
|
text-align: center; |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
color: white; |
|
padding: 2rem; |
|
border-radius: 15px; |
|
margin-bottom: 2rem; |
|
box-shadow: 0 8px 32px rgba(0,0,0,0.1); |
|
} |
|
.accordion-content { |
|
overflow: hidden !important; |
|
width: 100% !important; |
|
} |
|
.gr-accordion { |
|
width: 100% !important; |
|
max-width: 100% !important; |
|
} |
|
.gr-accordion .gr-row { |
|
width: 100% !important; |
|
max-width: 100% !important; |
|
margin: 0 !important; |
|
} |
|
.gr-accordion .gr-column { |
|
min-width: 0 !important; |
|
flex: 1 !important; |
|
max-width: 50% !important; |
|
padding-right: 1rem !important; |
|
} |
|
.gr-accordion .gr-column:last-child { |
|
padding-right: 0 !important; |
|
padding-left: 1rem !important; |
|
} |
|
""") as demo: |
|
|
|
|
|
gr.HTML(""" |
|
<div class="main-header"> |
|
<h1 style="margin: 0; font-size: 2.5rem; font-weight: bold;"> |
|
π€ French Public Data Analysis Agent |
|
</h1> |
|
<p style="font-size: 1.2rem; opacity: 0.9;"> |
|
Intelligent analysis of French public datasets with AI-powered insights |
|
</p> |
|
</div> |
|
""") |
|
|
|
|
|
gr.HTML(""" |
|
<div style="text-align: center; background: #f8fafc; padding: 1.5rem; border-radius: 10px; margin: 1rem 0;"> |
|
<p style="font-size: 1.1rem; color: #374151; margin: 0;"> |
|
π <strong>Search in French or English</strong> β’ π€ <strong>AI Agent finds & analyzes datasets</strong> β’ πΊοΈ <strong>Generate Reports with visualizations</strong> |
|
</p> |
|
<p style="font-size: 0.9rem; color: #6b7280; margin-top: 0.5rem;"> |
|
Initial search results guide the agent, but it can search for different datasets if needed |
|
</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Accordion("π‘ Tips & Information", open=False): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown(""" |
|
π― **How to Use:** |
|
- Enter search terms related to French public data |
|
- Leave empty for random high-quality dataset selection |
|
- System provides initial search results to guide the agent |
|
- Agent can use provided results or search for different datasets |
|
- Results include visualizations and downloadable reports |
|
|
|
β±οΈ **Processing Time:** |
|
- Analysis takes 7-15 minutes depending on dataset complexity |
|
- Agent has full autonomy to find the best datasets |
|
""") |
|
with gr.Column(): |
|
gr.Markdown(""" |
|
β οΈ **Important Notes:** |
|
- Agent gets initial search results but has full autonomy to make decisions |
|
- Agent can choose from initial results or search for different datasets |
|
- Some datasets may not contain processable CSV/JSON files |
|
- All visualizations are automatically generated |
|
- Maps focus on France when geographic data is available |
|
|
|
π **Language Support:** |
|
- Search in French or English - queries are automatically translated |
|
""") |
|
|
|
with gr.Row(): |
|
query_input = gr.Textbox( |
|
label="Search Query", |
|
placeholder="e.g., road traffic accidents, education, housing (or leave empty for random selection)", |
|
scale=4 |
|
) |
|
search_button = gr.Button( |
|
"π Analyze Dataset", |
|
variant="primary", |
|
scale=1, |
|
size="lg" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
gr.HTML(""" |
|
<div> |
|
<h3 style="color: #374151">π Quick Start Examples</h3> |
|
<p style="color: #6b7280">Click any example below to get started</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
examples = [ |
|
("π Road Traffic Accidents 2023", "road traffic accidents 2023"), |
|
("π Education Directory", "education directory"), |
|
("π French Vacant Housing Private Park", "French vacant housing private park"), |
|
] |
|
|
|
for emoji_text, query_text in examples: |
|
gr.Button( |
|
emoji_text, |
|
variant="secondary", |
|
size="sm" |
|
).click( |
|
lambda x=query_text: x, |
|
outputs=query_input |
|
) |
|
|
|
|
|
with gr.Group(): |
|
page_url_display = gr.Textbox(label="π Page Started On", interactive=False, visible=False) |
|
with gr.Row(): |
|
status_output = gr.Textbox(label="π Analysis Status", interactive=False, scale=1) |
|
|
|
|
|
with gr.Row(): |
|
download_button = gr.File( |
|
label="π Download DOCX Report", |
|
visible=False |
|
) |
|
|
|
gr.Markdown("---") |
|
gr.HTML(""" |
|
<div style="text-align: center; margin: 2rem 0;"> |
|
<h2 style="color: #374151; margin-bottom: 0.5rem;">π Generated Visualizations</h2> |
|
<p style="color: #6b7280; margin: 0;">Automatically generated charts and maps will appear below</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
image1 = gr.Image(label="π Chart 1", visible=False, height=400) |
|
image2 = gr.Image(label="π Chart 2", visible=False, height=400) |
|
with gr.Column(): |
|
image3 = gr.Image(label="πΊοΈ Map/Chart 3", visible=False, height=400) |
|
image4 = gr.Image(label="π Chart 4", visible=False, height=400) |
|
|
|
|
|
followup_section_divider = gr.Markdown("---", visible=False) |
|
followup_section_header = gr.HTML(""" |
|
<div style="text-align: center; margin: 2rem 0;"> |
|
<h2 style="color: #374151; margin-bottom: 0.5rem;">π€ Follow-up Analysis</h2> |
|
<p style="color: #6b7280; margin: 0;">Ask about report findings, request data analysis, or get contextual information</p> |
|
</div> |
|
""", visible=False) |
|
|
|
with gr.Row(visible=False) as followup_input_row: |
|
followup_input = gr.Textbox( |
|
label="Follow-up Question", |
|
placeholder="e.g., What are the main findings?, Show me correlation between columns, What is road safety policy in France?", |
|
scale=4 |
|
) |
|
followup_button = gr.Button( |
|
"π Analyze", |
|
variant="secondary", |
|
scale=1, |
|
size="lg" |
|
) |
|
|
|
with gr.Row(visible=False) as followup_result_row: |
|
followup_result = gr.Textbox( |
|
label="π Follow-up Analysis Results", |
|
interactive=False, |
|
lines=10, |
|
visible=True |
|
) |
|
|
|
with gr.Row(visible=False) as followup_image_row: |
|
followup_image = gr.Image( |
|
label="π Follow-up Visualization", |
|
visible=False, |
|
height=500 |
|
) |
|
|
|
|
|
with gr.Row(visible=False) as followup_examples_header_row: |
|
gr.HTML(""" |
|
<div> |
|
<h4 style="color: #374151">π‘ Example Follow-up Questions</h4> |
|
<p style="color: #6b7280">Click any example below to try it out</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(visible=False) as followup_examples_row: |
|
followup_examples = [ |
|
("π Report Summary", "What were the main findings from the analysis?"), |
|
("π Context Info", "What is the policy context for this data in France?"), |
|
("π Create Chart", "Show me the correlation between two numerical columns with a scatter plot"), |
|
("π Data Statistics", "Give me statistical summary for a specific column"), |
|
("π― Filter Data", "Filter the data by specific criteria and show results"), |
|
("π General Question", "Tell me more about this topic and its importance"), |
|
] |
|
|
|
for emoji_text, query_text in followup_examples: |
|
gr.Button( |
|
emoji_text, |
|
variant="secondary", |
|
size="sm" |
|
).click( |
|
lambda x=query_text: x, |
|
outputs=followup_input |
|
) |
|
|
|
|
|
search_button.click( |
|
fn=search_and_analyze, |
|
inputs=[query_input], |
|
outputs=[page_url_display, status_output, download_button, image1, image2, image3, image4, |
|
followup_section_divider, followup_section_header, followup_input_row, |
|
followup_result_row, followup_image_row, followup_examples_header_row, followup_examples_row], |
|
show_progress="full" |
|
) |
|
|
|
|
|
followup_button.click( |
|
fn=run_followup_question, |
|
inputs=[followup_input], |
|
outputs=[followup_result, followup_image], |
|
show_progress="full" |
|
) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
demo.queue() |
|
demo.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |