aura-mind-glow / app.py
surfiniaburger's picture
cloud remedy
91538b6
# ==============================================================================
# Aura Mind Glow - Main Application (Refactored)
# ==============================================================================
"""
This script launches the Aura Mind Glow application, now with multiple modes
and user authentication.
"""
# --- Step 0: Essential Imports ---
import gradio as gr
from PIL import Image
import os
import warnings
import socket
import tempfile
import json
import re
import requests # Added for authentication
from bigquery_search import search_bigquery_for_remedy
# Suppress potential warnings for a cleaner console
warnings.filterwarnings("ignore")
os.environ["TORCH_COMPILE_DISABLE"] = "1" # Ensure torch compile is off
# --- Step 1: Import Core Components from Modules ---
from vision_model import load_vision_model
from knowledge_base import KnowledgeBase
from agent_setup import initialize_adk
from google.genai import types
from google.cloud import bigquery
from story_generator import create_story_prompt_from_pdf, generate_video_from_prompt
from langchain_huggingface import HuggingFaceEndpoint
from bigquery_uploader import upload_diagnosis_to_bigquery
from vector_store import embed_and_store_documents, search_documents
print("βœ… All libraries imported successfully.")
# --- Step 2: Global and Authentication Initialization ---
# Authentication Configuration
GCP_API_KEY = os.environ.get("GCP_API_KEY")
if not GCP_API_KEY:
print("⚠️ WARNING: GCP_API_KEY environment variable not set. Authentication will fail.")
# Define placeholder URLs to avoid crashing, but they won't work
SIGNUP_URL = "YOUR_SIGNUP_URL_HERE"
LOGIN_URL = "YOUR_LOGIN_URL_HERE"
else:
SIGNUP_URL = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={GCP_API_KEY}"
LOGIN_URL = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={GCP_API_KEY}"
# This expensive setup runs only ONCE when the application starts.
print("Performing initial setup...")
VISION_MODEL, PROCESSOR = load_vision_model()
KB = KnowledgeBase()
RETRIEVER = KB # The retriever is now the KB itself
embed_and_store_documents() # Initialize and load the vector store
# Initialize ADK components for Connected Mode
adk_components = initialize_adk(VISION_MODEL, PROCESSOR, RETRIEVER)
ADK_RUNNER = adk_components["runner"] if adk_components else None
DIAGNOSIS_TOOL = adk_components["diagnosis_tool"] if adk_components else None
REMEDY_TOOL = adk_components["remedy_tool"] if adk_components else None
SESSION_SERVICE = adk_components["session_service"] if adk_components else None
# Initialize a separate LLM for the Story Generator
STORY_LLM = None
if os.environ.get("HF_TOKEN"):
try:
STORY_LLM = HuggingFaceEndpoint(
repo_id="HuggingFaceH4/zephyr-7b-beta",
huggingfacehub_api_token=os.environ.get("HF_TOKEN"),
max_new_tokens=150,
temperature=0.4,
)
print("βœ… Story Generator LLM initialized successfully.")
except Exception as e:
print(f"❌ Could not initialize Story Generator LLM: {e}")
else:
print("❌ HF_TOKEN not found. Story Generator Mode will be disabled.")
# --- Step 3: Authentication UI and Logic ---
def signup_user(email, password):
"""Signs up a new user using Google Identity Platform."""
payload = {
"email": email,
"password": password,
"returnSecureToken": True
}
try:
response = requests.post(SIGNUP_URL, json=payload)
response.raise_for_status() # Raise an exception for bad status codes
# No need to return anything on success, we'll just inform the user
return "βœ… Signup successful! You can now log in."
except requests.exceptions.HTTPError as err:
error_json = err.response.json()
error_message = error_json.get("error", {}).get("message", "Unknown error")
print(f"❌ Signup failed: {error_message}")
return f"❌ Signup failed: {error_message}"
except Exception as e:
print(f"❌ An unexpected error occurred during signup: {e}")
return "❌ An unexpected error occurred. See console for details."
def login_user(email, password):
"""Logs in a user and returns their session info."""
payload = {
"email": email,
"password": password,
"returnSecureToken": True
}
try:
response = requests.post(LOGIN_URL, json=payload)
response.raise_for_status()
user_data = response.json()
# Return a dictionary with user info, which will be stored in the state
return {
"uid": user_data["localId"],
"id_token": user_data["idToken"],
"email": user_data["email"]
}
except requests.exceptions.HTTPError as err:
# Don't raise an error, just return None to indicate login failure
print(f"Login failed for user: {email}")
return None
except Exception as e:
print(f"❌ An unexpected error occurred during login: {e}")
return None
# --- Step 4: Define Gradio UIs ---
def create_field_mode_ui(user_state):
"""Creates the Gradio UI for the offline Field Mode."""
def clean_diagnosis_text(diagnosis: str) -> str:
cleaned_text = re.sub(r'[^\w\s.\-,"]', '', diagnosis)
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
def search_bigquery_for_remedy(search_query: str) -> str:
try:
client = bigquery.Client(project="gem-creation")
query = """
SELECT remedy_description FROM `gem-creation.maize_remedies.remedies`
WHERE SEARCH(remedy_description, @query)
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("query", "STRING", search_query)]
)
query_job = client.query(query, job_config=job_config)
results = list(query_job)
return results[0].remedy_description if results else "No remedy found."
except Exception as e:
return f"Error querying BigQuery: {e}"
def get_diagnosis_and_remedy(uploaded_image: Image.Image, feedback: str):
"""
Performs diagnosis on an uploaded plant image and provides a remedy.
This tool takes an image of a plant, diagnoses its condition using a vision
model, and then searches both a local knowledge base and a cloud database
for a suitable remedy. It also logs the diagnosis for future analysis.
Args:
uploaded_image (Image.Image): The PIL Image of the plant to be diagnosed.
feedback (str): Optional user feedback on the diagnosis or remedy.
Returns:
str: A formatted markdown string containing the diagnosis report
and suggested remedies from local and cloud sources.
"""
if uploaded_image is None:
return "Please upload an image."
# Handle different contexts for user_state (UI vs. API call)
farmer_id = "api_call_user" # Default user for API calls
if hasattr(user_state, 'get') and user_state.get("uid"):
# This block runs for UI users who are logged in
farmer_id = user_state.get("uid")
elif not hasattr(user_state, 'get'):
# This block runs for API calls where user_state is not a dict-like object
print("API call detected, proceeding with default farmer_id.")
else:
# This block runs for UI users who are not logged in
raise gr.Error("Authentication error. Please log out and log in again.")
temp_file_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
uploaded_image.save(temp_file.name)
temp_file_path = temp_file.name
diagnosis = DIAGNOSIS_TOOL(temp_file_path)
if "Could not parse" in diagnosis:
return f"Could not identify condition: {diagnosis}"
report_title = diagnosis
cleaned_diagnosis = clean_diagnosis_text(diagnosis)
# --- Hybrid Search ---
local_remedy_list = search_documents(cleaned_diagnosis)
local_remedy = local_remedy_list[0] if local_remedy_list else "No remedy found in local knowledge base."
search_query = "healthy maize" if "healthy" in cleaned_diagnosis.lower() else "phosphorus" if "phosphorus" in cleaned_diagnosis.lower() else "Wetin My Eye See So"
cloud_remedy = search_bigquery_for_remedy(search_query)
final_response = f"""
## Diagnosis Report
**Condition:**
### {report_title}
---
## Suggested Remedy (from Cloud)
{cloud_remedy}
"""
diagnosis_data = {
"ai_diagnosis": report_title,
"recommended_action": local_remedy,
"confidence_score": None,
"farmer_id": farmer_id, # Use the determined farmer_id
"gps_latitude": None,
"gps_longitude": None,
"crop_type": "Maize",
"crop_variety": None,
"farmer_feedback": feedback,
"treatment_applied": None,
"outcome_image_id": None,
}
upload_diagnosis_to_bigquery(diagnosis_data)
return final_response
finally:
if temp_file_path:
os.remove(temp_file_path)
with gr.Blocks() as field_mode_blocks:
gr.Markdown("### 🌽 Aura Mind Glow: Field Mode")
gr.Markdown("Upload an image of a maize plant for diagnosis and treatment.")
with gr.Row():
with gr.Column():
image_input = gr.Image(type="pil", label="Upload Maize Plant Image", sources=["upload", "webcam"])
feedback_input = gr.Textbox(label="Provide Feedback on the Remedy (Optional)", placeholder="e.g., This remedy worked well...")
submit_btn = gr.Button("Get Diagnosis")
with gr.Column():
output_markdown = gr.Markdown(label="Diagnosis and Remedy Report")
submit_btn.click(
fn=get_diagnosis_and_remedy,
inputs=[image_input, feedback_input],
outputs=output_markdown
)
return field_mode_blocks
# --- All other UI creation functions (create_connected_mode_ui, etc.) remain the same ---
# Note: For a full implementation, you would pass the user_state to other UIs
# and use the farmer_id there as well.
def create_connected_mode_ui(user_state):
"""Creates the Gradio UI for the online Connected Mode."""
with gr.Blocks(theme=gr.themes.Soft(primary_hue="green", secondary_hue="lime")) as demo:
gr.Markdown("# 🌽 Aura Mind Glow: Connected Mode πŸ€–")
gr.Markdown("I am an AI farming assistant. Upload an image and ask for a diagnosis and remedy.")
chatbot = gr.Chatbot(height=600)
msg = gr.MultimodalTextbox(file_types=["image"], label="Ask a question and/or upload an image...")
async def respond(chat_input, history, user_state):
if not user_state or not user_state.get("uid"):
history.append((chat_input.get("text", ""), "Authentication error. Please log out and log in again."))
yield history, gr.MultimodalTextbox(value=None)
return
user_id = user_state["uid"]
if not SESSION_SERVICE or not ADK_RUNNER:
history.append((chat_input.get("text", ""), "Connected mode is not available. Check logs."))
yield history, gr.MultimodalTextbox(value=None)
return
session = await SESSION_SERVICE.create_session(user_id=user_id, app_name="AuraMindGlow")
files = chat_input.get("files", [])
text = chat_input.get("text", "")
if not files and not text:
# If there is no input, do nothing
yield history, gr.MultimodalTextbox(value=None)
return
if not files:
history.append((text, "Please upload an image for diagnosis."))
yield history, gr.MultimodalTextbox(value=None)
return
# Create the prompt for the ADK agent
with open(files[0], 'rb') as f:
image_data = f.read()
image_part = types.Part(
inline_data=types.Blob(
mime_type='image/png',
data=image_data
)
)
text_part = types.Part(text=text or "Diagnose this plant and provide a remedy.")
prompt = types.Content(parts=[image_part, text_part], role="user")
# Stream the response from the agent
bot_message = ""
history.append([(files[0], text), bot_message])
try:
async for event in ADK_RUNNER.run_async(
session_id=session.id, user_id=user_id, new_message=prompt
):
if event.is_llm_response_chunk() and event.content.parts:
bot_message += event.content.parts[0].text
history[-1] = (((files[0], text), bot_message))
yield history, gr.MultimodalTextbox(value=None)
elif event.is_final_response() and event.content.parts:
bot_message = event.content.parts[0].text
history[-1] = (((files[0], text), bot_message))
yield history, gr.MultimodalTextbox(value=None)
except Exception as e:
print(f"Error during agent execution: {e}")
history[-1] = (((files[0], text), f"An error occurred: {e}"))
yield history, gr.MultimodalTextbox(value=None)
msg.submit(respond, [msg, chatbot, user_state], [chatbot, msg])
return demo
def create_document_analysis_ui():
"""Creates the Gradio UI for the Document Analysis Mode."""
with gr.Blocks(theme=gr.themes.Soft(primary_hue="purple", secondary_hue="pink")) as demo:
gr.Markdown("# 🌽 Aura Mind Glow: Document Analysis Mode πŸ“„")
gr.Markdown("Upload a PDF or a spreadsheet and ask questions about its content.")
with gr.Row():
with gr.Column(scale=1):
doc_input = gr.File(label="Upload Document", file_types=[".pdf", ".csv"])
query_input = gr.Textbox(label="Ask a question about the document")
submit_btn = gr.Button("Analyze and Query")
with gr.Column(scale=2):
answer_output = gr.Textbox(label="Answer", interactive=False, lines=10)
status_output = gr.Textbox(label="Status", interactive=False, lines=3)
def analysis_process(doc, query):
if doc is None:
yield "Please upload a document to begin.", ""
return
file_path = doc.name
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == ".pdf":
yield "Analyzing PDF...", ""
chain, vector_store = analyze_pdf(file_path)
if chain and vector_store:
yield "PDF analyzed successfully. Now querying...", ""
answer = query_pdf(chain, vector_store, query)
yield answer, "Query complete."
else:
yield "Failed to analyze PDF.", "Error"
elif file_ext == ".csv":
yield "Analyzing spreadsheet...", ""
agent = analyze_spreadsheet(file_path)
if agent:
yield "Spreadsheet analyzed successfully. Now querying...", ""
answer = query_spreadsheet(agent, query)
yield answer, "Query complete."
else:
yield "Failed to analyze spreadsheet.", "Error"
else:
yield "Unsupported file type. Please upload a PDF or a CSV file.", "Error"
submit_btn.click(
analysis_process,
inputs=[doc_input, query_input],
outputs=[answer_output, status_output]
)
return demo
def create_story_mode_ui():
"""Creates the Gradio UI for the Farmer's Story Mode."""
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="yellow")) as demo:
gr.Markdown("# 🌽 Aura Mind Glow: Farmer's Story Mode 🎬")
gr.Markdown("Create a short video story from your farm documents. Upload a PDF, describe the mood, and let the AI create a visual story.")
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"])
image_input = gr.Image(type="filepath", label="Optional: Upload a Starting Image")
user_prompt_input = gr.Textbox(label="Describe the video's tone or theme", placeholder="e.g., hopeful, a look back at a tough season, etc.")
submit_btn = gr.Button("Generate Video Story")
with gr.Column(scale=2):
video_output = gr.Video(label="Generated Video Story")
status_output = gr.Textbox(label="Status", interactive=False, lines=3)
def story_generation_process(pdf, image, user_prompt):
if pdf is None:
yield None, "Please upload a PDF document to begin."
return
yield None, "Step 1: Reading PDF and generating creative prompt..."
creative_prompt = create_story_prompt_from_pdf(pdf.name, user_prompt, STORY_LLM)
if "Error" in creative_prompt:
yield None, creative_prompt
return
yield None, f"Step 2: Generating video with prompt: '{creative_prompt[:100]}...' (This may take several minutes)"
video_path = generate_video_from_prompt(creative_prompt, image)
if "Error" in video_path:
yield None, video_path
return
yield video_path, "Video generation complete!"
submit_btn.click(
story_generation_process,
inputs=[pdf_input, image_input, user_prompt_input],
outputs=[video_output, status_output]
)
return demo
def create_settings_ui():
"""Creates the Gradio UI for the Settings tab."""
with gr.Blocks(theme=gr.themes.Soft(primary_hue="gray", secondary_hue="blue")) as demo:
gr.Markdown("# βš™οΈ Settings & Data Management")
gr.Markdown("Manage application settings and data synchronization.")
with gr.Row():
with gr.Column():
sync_btn = gr.Button("☁️ Sync Local Data to BigQuery Cloud")
status_output = gr.Textbox(label="Sync Status", interactive=False, lines=5)
def sync_data_to_cloud():
yield "Local data sync is no longer required as diagnoses are uploaded directly to BigQuery."
sync_btn.click(
sync_data_to_cloud,
inputs=[],
outputs=[status_output]
)
return demo
def create_kb_management_ui():
"""Creates the Gradio UI for managing the knowledge base."""
with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="purple")) as demo:
gr.Markdown("# πŸ“š Knowledge Base Management")
gr.Markdown("Manage the local, encrypted knowledge base.")
with gr.Row():
with gr.Column():
gr.Markdown("### Rebuild Knowledge Base")
rebuild_btn = gr.Button("Rebuild from Source Files")
rebuild_status = gr.Textbox(label="Status", interactive=False)
with gr.Column():
gr.Markdown("### Add PDF to Knowledge Base")
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"])
ingest_btn = gr.Button("Ingest PDF")
ingest_status = gr.Textbox(label="Status", interactive=False)
def rebuild_kb():
yield "Rebuilding knowledge base..."
try:
KB.rebuild_from_default_files() # Call the new method to rebuild from default files
yield "Knowledge base rebuilt successfully."
except Exception as e:
yield f"Error rebuilding knowledge base: {e}"
def ingest_pdf(pdf):
if pdf is None:
return "Please upload a PDF file."
yield "Ingesting PDF..."
try:
KB.ingest_pdf(pdf.name, os.path.basename(pdf.name))
yield f"Successfully ingested {os.path.basename(pdf.name)}."
except Exception as e:
yield f"Error ingesting PDF: {e}"
rebuild_btn.click(rebuild_kb, outputs=[rebuild_status])
ingest_btn.click(ingest_pdf, inputs=[pdf_input], outputs=[ingest_status])
return demo
# --- Step 5: App Launcher ---
def check_internet_connection(host="8.8.8.8", port=53, timeout=3):
"""Check for internet connectivity."""
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except socket.error:
return False
def _setup_gcp_credentials():
gcp_credentials_json = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if gcp_credentials_json and gcp_credentials_json.strip().startswith("{"):
try:
credentials_dict = json.loads(gcp_credentials_json)
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.json') as temp_file:
json.dump(credentials_dict, temp_file)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name
print(f"βœ… GCP credentials set from env var to: {os.environ['GOOGLE_APPLICATION_CREDENTIALS']}")
except Exception as e:
print(f"❌ Error setting up GCP credentials: {e}")
else:
print("ℹ️ GOOGLE_APPLICATION_CREDENTIALS not found as JSON string. Using other means.")
if __name__ == "__main__":
_setup_gcp_credentials()
with gr.Blocks(theme=gr.themes.Soft(), css="footer {visibility: hidden !important;}") as demo:
user_state = gr.State(None) # To hold user session info (uid, token, etc)
# --- Login UI ---
with gr.Column(visible=True) as login_view:
gr.Markdown("# Welcome to Aura Mind Glow", elem_id="login_title")
gr.Markdown("Please log in or sign up to continue.")
with gr.Row():
email_input = gr.Textbox(label="Email", placeholder="Enter your email")
password_input = gr.Textbox(label="Password", type="password", placeholder="Enter your password")
with gr.Row():
login_btn = gr.Button("Login")
signup_btn = gr.Button("Sign Up")
auth_feedback = gr.Markdown()
# --- Main Application UI (Initially Hidden) ---
with gr.Column(visible=False) as main_view:
gr.Markdown("## Aura Mind Glow Dashboard")
with gr.Row():
logged_in_user_display = gr.Markdown()
logout_btn = gr.Button("Logout")
# Build the tabbed interface
interface_list = []
tab_titles = []
# Field Mode is always available after login
field_mode_ui = create_field_mode_ui(user_state)
interface_list.append(field_mode_ui)
tab_titles.append("Field Mode")
if check_internet_connection():
if ADK_RUNNER: interface_list.append(create_connected_mode_ui(user_state)); tab_titles.append("Connected Mode")
if STORY_LLM: interface_list.append(create_story_mode_ui()); tab_titles.append("Farmer's Story")
interface_list.append(create_document_analysis_ui()); tab_titles.append("Doc Analysis")
interface_list.append(create_settings_ui()); tab_titles.append("Settings")
interface_list.append(create_kb_management_ui()); tab_titles.append("Knowledge Base")
else:
gr.Markdown("**Warning:** No internet connection. Some features are disabled.")
main_tabs = gr.TabbedInterface(interface_list, tab_titles)
# --- Event Handlers ---
def on_login_success(user_data):
"""Called when login is successful. Hides login UI, shows main UI."""
if user_data:
return (
gr.update(visible=False), # Hide login_view
gr.update(visible=True), # Show main_view
f"Logged in as: **{user_data['email']}**", # Update user display
""
)
return gr.update(), gr.update(), gr.update(), "❌ Invalid email or password."
def on_logout():
"""Called on logout. Hides main UI, shows login UI."""
return None, gr.update(visible=True), gr.update(visible=False), ""
# Button and state change listeners
login_btn.click(
fn=login_user,
inputs=[email_input, password_input],
outputs=[user_state]
).then(
fn=on_login_success,
inputs=[user_state],
outputs=[login_view, main_view, logged_in_user_display, auth_feedback]
)
signup_btn.click(
fn=signup_user,
inputs=[email_input, password_input],
outputs=[auth_feedback]
)
logout_btn.click(
fn=on_logout,
inputs=[],
outputs=[user_state, login_view, main_view, logged_in_user_display]
)
demo.launch(share=True, debug=True, mcp_server=True)