import gradio as gr import openai import sqlite3 import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import os from typing import Optional, Tuple import re # OpenRouter API Key (Replace with yours) OPENROUTER_API_KEY = "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" OPENROUTER_MODEL = "sophosympatheia/rogue-rose-103b-v0.2:free" # Hugging Face Space path DB_PATH = "ecommerce.db" # Ensure dataset exists if not os.path.exists(DB_PATH): os.system("wget https://your-dataset-link.com/ecommerce.db -O ecommerce.db") # Replace with actual dataset link # Initialize OpenAI client openai_client = openai.OpenAI(api_key=OPENROUTER_API_KEY, base_url="https://openrouter.ai/api/v1") # Function: Fetch database schema def fetch_schema(db_path: str) -> str: conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() schema = "" for table in tables: table_name = table[0] cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() schema += f"Table: {table_name}\n" for column in columns: schema += f" Column: {column[1]}, Type: {column[2]}\n" conn.close() return schema # Function: Extract SQL query from LLM response def extract_sql_query(response: str) -> str: # Use regex to find content between ```sql and ``` match = re.search(r"```sql(.*?)```", response, re.DOTALL) if match: return match.group(1).strip() # Extract and return the SQL query return response # Fallback: return the entire response if no SQL block is found # Function: Convert text to SQL def text_to_sql(query: str, schema: str) -> str: prompt = ( "You are an SQL expert. Given the following database schema:\n\n" f"{schema}\n\n" "Convert the following query into SQL:\n\n" f"Query: {query}\n" "SQL:" ) try: response = openai_client.chat.completions.create( model=OPENROUTER_MODEL, messages=[{"role": "system", "content": "You are an SQL expert."}, {"role": "user", "content": prompt}] ) sql_response = response.choices[0].message.content.strip() return extract_sql_query(sql_response) # Extract SQL query from the response except Exception as e: return f"Error: {e}" def preprocess_sql_for_sqlite(sql_query: str) -> str: """ Replace non-SQLite functions with SQLite-compatible equivalents. """ sql_query = re.sub(r"\bMONTH\s*\(\s*([\w.]+)\s*\)", r"strftime('%m', \1)", sql_query) sql_query = re.sub(r"\bYEAR\s*\(\s*([\w.]+)\s*\)", r"strftime('%Y', \1)", sql_query) return sql_query def execute_sql(sql_query: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]: try: conn = sqlite3.connect(DB_PATH) sql_query = preprocess_sql_for_sqlite(sql_query) # Convert to SQLite-compatible SQL df = pd.read_sql_query(sql_query, conn) conn.close() return df, None except Exception as e: return None, f"SQL Execution Error: {e}" # Function: Generate Dynamic Visualization def visualize_data(df: pd.DataFrame) -> Optional[str]: if df.empty or df.shape[1] < 2: return None plt.figure(figsize=(6, 4)) sns.set_theme(style="darkgrid") # Detect numeric columns numeric_cols = df.select_dtypes(include=['number']).columns if len(numeric_cols) < 1: return None # Choose visualization type dynamically if len(numeric_cols) == 1: # Single numeric column, assume it's a count metric sns.histplot(df[numeric_cols[0]], bins=10, kde=True, color="teal") plt.title(f"Distribution of {numeric_cols[0]}") elif len(numeric_cols) == 2: # Two numeric columns, assume X-Y plot sns.scatterplot(x=df[numeric_cols[0]], y=df[numeric_cols[1]], color="blue") plt.title(f"{numeric_cols[0]} vs {numeric_cols[1]}") elif df.shape[0] < 10: # If rows are few, prefer pie chart plt.pie(df[numeric_cols[0]], labels=df.iloc[:, 0], autopct='%1.1f%%', colors=sns.color_palette("pastel")) plt.title(f"Proportion of {numeric_cols[0]}") else: # Default: Bar chart for categories + values sns.barplot(x=df.iloc[:, 0], y=df[numeric_cols[0]], palette="coolwarm") plt.xticks(rotation=45) plt.title(f"{df.columns[0]} vs {numeric_cols[0]}") plt.tight_layout() plt.savefig("chart.png") return "chart.png" # Gradio UI def gradio_ui(query: str) -> Tuple[str, str, Optional[str]]: schema = fetch_schema(DB_PATH) sql_query = text_to_sql(query, schema) df, error = execute_sql(sql_query) if error: return sql_query, error, None visualization = visualize_data(df) if df is not None else None return sql_query, df.to_string(index=False), visualization # Launch Gradio App with gr.Blocks() as demo: gr.Markdown("## SQL Explorer: Text-to-SQL with Real Execution & Visualization") query_input = gr.Textbox(label="Enter your query", placeholder="e.g., Show all products sold in 2018.") submit_btn = gr.Button("Convert & Execute") sql_output = gr.Textbox(label="Generated SQL Query") table_output = gr.Textbox(label="Query Results") chart_output = gr.Image(label="Data Visualization") submit_btn.click(gradio_ui, inputs=[query_input], outputs=[sql_output, table_output, chart_output]) demo.launch()