Spaces:
Sleeping
Sleeping
# ============================================================================== | |
# Aura Mind Glow - Main Application (Refactored) | |
# ============================================================================== | |
""" | |
This script launches the Aura Mind Glow application, now with multiple modes | |
and user authentication. | |
""" | |
# --- Step 0: Essential Imports --- | |
import gradio as gr | |
from PIL import Image | |
import os | |
import warnings | |
import socket | |
import tempfile | |
import json | |
import re | |
import requests # Added for authentication | |
from bigquery_search import search_bigquery_for_remedy | |
# Suppress potential warnings for a cleaner console | |
warnings.filterwarnings("ignore") | |
os.environ["TORCH_COMPILE_DISABLE"] = "1" # Ensure torch compile is off | |
# --- Step 1: Import Core Components from Modules --- | |
from vision_model import load_vision_model | |
from knowledge_base import KnowledgeBase | |
from agent_setup import initialize_adk | |
from google.genai import types | |
from google.cloud import bigquery | |
from story_generator import create_story_prompt_from_pdf, generate_video_from_prompt | |
from langchain_huggingface import HuggingFaceEndpoint | |
from bigquery_uploader import upload_diagnosis_to_bigquery | |
from vector_store import embed_and_store_documents, search_documents | |
print("β All libraries imported successfully.") | |
# --- Step 2: Global and Authentication Initialization --- | |
# Authentication Configuration | |
GCP_API_KEY = os.environ.get("GCP_API_KEY") | |
if not GCP_API_KEY: | |
print("β οΈ WARNING: GCP_API_KEY environment variable not set. Authentication will fail.") | |
# Define placeholder URLs to avoid crashing, but they won't work | |
SIGNUP_URL = "YOUR_SIGNUP_URL_HERE" | |
LOGIN_URL = "YOUR_LOGIN_URL_HERE" | |
else: | |
SIGNUP_URL = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={GCP_API_KEY}" | |
LOGIN_URL = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={GCP_API_KEY}" | |
# This expensive setup runs only ONCE when the application starts. | |
print("Performing initial setup...") | |
VISION_MODEL, PROCESSOR = load_vision_model() | |
KB = KnowledgeBase() | |
RETRIEVER = KB # The retriever is now the KB itself | |
embed_and_store_documents() # Initialize and load the vector store | |
# Initialize ADK components for Connected Mode | |
adk_components = initialize_adk(VISION_MODEL, PROCESSOR, RETRIEVER) | |
ADK_RUNNER = adk_components["runner"] if adk_components else None | |
DIAGNOSIS_TOOL = adk_components["diagnosis_tool"] if adk_components else None | |
REMEDY_TOOL = adk_components["remedy_tool"] if adk_components else None | |
SESSION_SERVICE = adk_components["session_service"] if adk_components else None | |
# Initialize a separate LLM for the Story Generator | |
STORY_LLM = None | |
if os.environ.get("HF_TOKEN"): | |
try: | |
STORY_LLM = HuggingFaceEndpoint( | |
repo_id="HuggingFaceH4/zephyr-7b-beta", | |
huggingfacehub_api_token=os.environ.get("HF_TOKEN"), | |
max_new_tokens=150, | |
temperature=0.4, | |
) | |
print("β Story Generator LLM initialized successfully.") | |
except Exception as e: | |
print(f"β Could not initialize Story Generator LLM: {e}") | |
else: | |
print("β HF_TOKEN not found. Story Generator Mode will be disabled.") | |
# --- Step 3: Authentication UI and Logic --- | |
def signup_user(email, password): | |
"""Signs up a new user using Google Identity Platform.""" | |
payload = { | |
"email": email, | |
"password": password, | |
"returnSecureToken": True | |
} | |
try: | |
response = requests.post(SIGNUP_URL, json=payload) | |
response.raise_for_status() # Raise an exception for bad status codes | |
# No need to return anything on success, we'll just inform the user | |
return "β Signup successful! You can now log in." | |
except requests.exceptions.HTTPError as err: | |
error_json = err.response.json() | |
error_message = error_json.get("error", {}).get("message", "Unknown error") | |
print(f"β Signup failed: {error_message}") | |
return f"β Signup failed: {error_message}" | |
except Exception as e: | |
print(f"β An unexpected error occurred during signup: {e}") | |
return "β An unexpected error occurred. See console for details." | |
def login_user(email, password): | |
"""Logs in a user and returns their session info.""" | |
payload = { | |
"email": email, | |
"password": password, | |
"returnSecureToken": True | |
} | |
try: | |
response = requests.post(LOGIN_URL, json=payload) | |
response.raise_for_status() | |
user_data = response.json() | |
# Return a dictionary with user info, which will be stored in the state | |
return { | |
"uid": user_data["localId"], | |
"id_token": user_data["idToken"], | |
"email": user_data["email"] | |
} | |
except requests.exceptions.HTTPError as err: | |
# Don't raise an error, just return None to indicate login failure | |
print(f"Login failed for user: {email}") | |
return None | |
except Exception as e: | |
print(f"β An unexpected error occurred during login: {e}") | |
return None | |
# --- Step 4: Define Gradio UIs --- | |
def create_field_mode_ui(user_state): | |
"""Creates the Gradio UI for the offline Field Mode.""" | |
def clean_diagnosis_text(diagnosis: str) -> str: | |
cleaned_text = re.sub(r'[^\w\s.\-,"]', '', diagnosis) | |
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() | |
return cleaned_text | |
def search_bigquery_for_remedy(search_query: str) -> str: | |
try: | |
client = bigquery.Client(project="gem-creation") | |
query = """ | |
SELECT remedy_description FROM `gem-creation.maize_remedies.remedies` | |
WHERE SEARCH(remedy_description, @query) | |
""" | |
job_config = bigquery.QueryJobConfig( | |
query_parameters=[bigquery.ScalarQueryParameter("query", "STRING", search_query)] | |
) | |
query_job = client.query(query, job_config=job_config) | |
results = list(query_job) | |
return results[0].remedy_description if results else "No remedy found." | |
except Exception as e: | |
return f"Error querying BigQuery: {e}" | |
def get_diagnosis_and_remedy(uploaded_image: Image.Image, feedback: str): | |
""" | |
Performs diagnosis on an uploaded plant image and provides a remedy. | |
This tool takes an image of a plant, diagnoses its condition using a vision | |
model, and then searches both a local knowledge base and a cloud database | |
for a suitable remedy. It also logs the diagnosis for future analysis. | |
Args: | |
uploaded_image (Image.Image): The PIL Image of the plant to be diagnosed. | |
feedback (str): Optional user feedback on the diagnosis or remedy. | |
Returns: | |
str: A formatted markdown string containing the diagnosis report | |
and suggested remedies from local and cloud sources. | |
""" | |
if uploaded_image is None: | |
return "Please upload an image." | |
# Handle different contexts for user_state (UI vs. API call) | |
farmer_id = "api_call_user" # Default user for API calls | |
if hasattr(user_state, 'get') and user_state.get("uid"): | |
# This block runs for UI users who are logged in | |
farmer_id = user_state.get("uid") | |
elif not hasattr(user_state, 'get'): | |
# This block runs for API calls where user_state is not a dict-like object | |
print("API call detected, proceeding with default farmer_id.") | |
else: | |
# This block runs for UI users who are not logged in | |
raise gr.Error("Authentication error. Please log out and log in again.") | |
temp_file_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: | |
uploaded_image.save(temp_file.name) | |
temp_file_path = temp_file.name | |
diagnosis = DIAGNOSIS_TOOL(temp_file_path) | |
if "Could not parse" in diagnosis: | |
return f"Could not identify condition: {diagnosis}" | |
report_title = diagnosis | |
cleaned_diagnosis = clean_diagnosis_text(diagnosis) | |
# --- Hybrid Search --- | |
local_remedy_list = search_documents(cleaned_diagnosis) | |
local_remedy = local_remedy_list[0] if local_remedy_list else "No remedy found in local knowledge base." | |
search_query = "healthy maize" if "healthy" in cleaned_diagnosis.lower() else "phosphorus" if "phosphorus" in cleaned_diagnosis.lower() else "Wetin My Eye See So" | |
cloud_remedy = search_bigquery_for_remedy(search_query) | |
final_response = f""" | |
## Diagnosis Report | |
**Condition:** | |
### {report_title} | |
--- | |
## Suggested Remedy (from Cloud) | |
{cloud_remedy} | |
""" | |
diagnosis_data = { | |
"ai_diagnosis": report_title, | |
"recommended_action": local_remedy, | |
"confidence_score": None, | |
"farmer_id": farmer_id, # Use the determined farmer_id | |
"gps_latitude": None, | |
"gps_longitude": None, | |
"crop_type": "Maize", | |
"crop_variety": None, | |
"farmer_feedback": feedback, | |
"treatment_applied": None, | |
"outcome_image_id": None, | |
} | |
upload_diagnosis_to_bigquery(diagnosis_data) | |
return final_response | |
finally: | |
if temp_file_path: | |
os.remove(temp_file_path) | |
with gr.Blocks() as field_mode_blocks: | |
gr.Markdown("### π½ Aura Mind Glow: Field Mode") | |
gr.Markdown("Upload an image of a maize plant for diagnosis and treatment.") | |
with gr.Row(): | |
with gr.Column(): | |
image_input = gr.Image(type="pil", label="Upload Maize Plant Image", sources=["upload", "webcam"]) | |
feedback_input = gr.Textbox(label="Provide Feedback on the Remedy (Optional)", placeholder="e.g., This remedy worked well...") | |
submit_btn = gr.Button("Get Diagnosis") | |
with gr.Column(): | |
output_markdown = gr.Markdown(label="Diagnosis and Remedy Report") | |
submit_btn.click( | |
fn=get_diagnosis_and_remedy, | |
inputs=[image_input, feedback_input], | |
outputs=output_markdown | |
) | |
return field_mode_blocks | |
# --- All other UI creation functions (create_connected_mode_ui, etc.) remain the same --- | |
# Note: For a full implementation, you would pass the user_state to other UIs | |
# and use the farmer_id there as well. | |
def create_connected_mode_ui(user_state): | |
"""Creates the Gradio UI for the online Connected Mode.""" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="green", secondary_hue="lime")) as demo: | |
gr.Markdown("# π½ Aura Mind Glow: Connected Mode π€") | |
gr.Markdown("I am an AI farming assistant. Upload an image and ask for a diagnosis and remedy.") | |
chatbot = gr.Chatbot(height=600) | |
msg = gr.MultimodalTextbox(file_types=["image"], label="Ask a question and/or upload an image...") | |
async def respond(chat_input, history, user_state): | |
if not user_state or not user_state.get("uid"): | |
history.append((chat_input.get("text", ""), "Authentication error. Please log out and log in again.")) | |
yield history, gr.MultimodalTextbox(value=None) | |
return | |
user_id = user_state["uid"] | |
if not SESSION_SERVICE or not ADK_RUNNER: | |
history.append((chat_input.get("text", ""), "Connected mode is not available. Check logs.")) | |
yield history, gr.MultimodalTextbox(value=None) | |
return | |
session = await SESSION_SERVICE.create_session(user_id=user_id, app_name="AuraMindGlow") | |
files = chat_input.get("files", []) | |
text = chat_input.get("text", "") | |
if not files and not text: | |
# If there is no input, do nothing | |
yield history, gr.MultimodalTextbox(value=None) | |
return | |
if not files: | |
history.append((text, "Please upload an image for diagnosis.")) | |
yield history, gr.MultimodalTextbox(value=None) | |
return | |
# Create the prompt for the ADK agent | |
with open(files[0], 'rb') as f: | |
image_data = f.read() | |
image_part = types.Part( | |
inline_data=types.Blob( | |
mime_type='image/png', | |
data=image_data | |
) | |
) | |
text_part = types.Part(text=text or "Diagnose this plant and provide a remedy.") | |
prompt = types.Content(parts=[image_part, text_part], role="user") | |
# Stream the response from the agent | |
bot_message = "" | |
history.append([(files[0], text), bot_message]) | |
try: | |
async for event in ADK_RUNNER.run_async( | |
session_id=session.id, user_id=user_id, new_message=prompt | |
): | |
if event.is_llm_response_chunk() and event.content.parts: | |
bot_message += event.content.parts[0].text | |
history[-1] = (((files[0], text), bot_message)) | |
yield history, gr.MultimodalTextbox(value=None) | |
elif event.is_final_response() and event.content.parts: | |
bot_message = event.content.parts[0].text | |
history[-1] = (((files[0], text), bot_message)) | |
yield history, gr.MultimodalTextbox(value=None) | |
except Exception as e: | |
print(f"Error during agent execution: {e}") | |
history[-1] = (((files[0], text), f"An error occurred: {e}")) | |
yield history, gr.MultimodalTextbox(value=None) | |
msg.submit(respond, [msg, chatbot, user_state], [chatbot, msg]) | |
return demo | |
def create_document_analysis_ui(): | |
"""Creates the Gradio UI for the Document Analysis Mode.""" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="purple", secondary_hue="pink")) as demo: | |
gr.Markdown("# π½ Aura Mind Glow: Document Analysis Mode π") | |
gr.Markdown("Upload a PDF or a spreadsheet and ask questions about its content.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
doc_input = gr.File(label="Upload Document", file_types=[".pdf", ".csv"]) | |
query_input = gr.Textbox(label="Ask a question about the document") | |
submit_btn = gr.Button("Analyze and Query") | |
with gr.Column(scale=2): | |
answer_output = gr.Textbox(label="Answer", interactive=False, lines=10) | |
status_output = gr.Textbox(label="Status", interactive=False, lines=3) | |
def analysis_process(doc, query): | |
if doc is None: | |
yield "Please upload a document to begin.", "" | |
return | |
file_path = doc.name | |
file_ext = os.path.splitext(file_path)[1].lower() | |
if file_ext == ".pdf": | |
yield "Analyzing PDF...", "" | |
chain, vector_store = analyze_pdf(file_path) | |
if chain and vector_store: | |
yield "PDF analyzed successfully. Now querying...", "" | |
answer = query_pdf(chain, vector_store, query) | |
yield answer, "Query complete." | |
else: | |
yield "Failed to analyze PDF.", "Error" | |
elif file_ext == ".csv": | |
yield "Analyzing spreadsheet...", "" | |
agent = analyze_spreadsheet(file_path) | |
if agent: | |
yield "Spreadsheet analyzed successfully. Now querying...", "" | |
answer = query_spreadsheet(agent, query) | |
yield answer, "Query complete." | |
else: | |
yield "Failed to analyze spreadsheet.", "Error" | |
else: | |
yield "Unsupported file type. Please upload a PDF or a CSV file.", "Error" | |
submit_btn.click( | |
analysis_process, | |
inputs=[doc_input, query_input], | |
outputs=[answer_output, status_output] | |
) | |
return demo | |
def create_story_mode_ui(): | |
"""Creates the Gradio UI for the Farmer's Story Mode.""" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="yellow")) as demo: | |
gr.Markdown("# π½ Aura Mind Glow: Farmer's Story Mode π¬") | |
gr.Markdown("Create a short video story from your farm documents. Upload a PDF, describe the mood, and let the AI create a visual story.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
image_input = gr.Image(type="filepath", label="Optional: Upload a Starting Image") | |
user_prompt_input = gr.Textbox(label="Describe the video's tone or theme", placeholder="e.g., hopeful, a look back at a tough season, etc.") | |
submit_btn = gr.Button("Generate Video Story") | |
with gr.Column(scale=2): | |
video_output = gr.Video(label="Generated Video Story") | |
status_output = gr.Textbox(label="Status", interactive=False, lines=3) | |
def story_generation_process(pdf, image, user_prompt): | |
if pdf is None: | |
yield None, "Please upload a PDF document to begin." | |
return | |
yield None, "Step 1: Reading PDF and generating creative prompt..." | |
creative_prompt = create_story_prompt_from_pdf(pdf.name, user_prompt, STORY_LLM) | |
if "Error" in creative_prompt: | |
yield None, creative_prompt | |
return | |
yield None, f"Step 2: Generating video with prompt: '{creative_prompt[:100]}...' (This may take several minutes)" | |
video_path = generate_video_from_prompt(creative_prompt, image) | |
if "Error" in video_path: | |
yield None, video_path | |
return | |
yield video_path, "Video generation complete!" | |
submit_btn.click( | |
story_generation_process, | |
inputs=[pdf_input, image_input, user_prompt_input], | |
outputs=[video_output, status_output] | |
) | |
return demo | |
def create_settings_ui(): | |
"""Creates the Gradio UI for the Settings tab.""" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="gray", secondary_hue="blue")) as demo: | |
gr.Markdown("# βοΈ Settings & Data Management") | |
gr.Markdown("Manage application settings and data synchronization.") | |
with gr.Row(): | |
with gr.Column(): | |
sync_btn = gr.Button("βοΈ Sync Local Data to BigQuery Cloud") | |
status_output = gr.Textbox(label="Sync Status", interactive=False, lines=5) | |
def sync_data_to_cloud(): | |
yield "Local data sync is no longer required as diagnoses are uploaded directly to BigQuery." | |
sync_btn.click( | |
sync_data_to_cloud, | |
inputs=[], | |
outputs=[status_output] | |
) | |
return demo | |
def create_kb_management_ui(): | |
"""Creates the Gradio UI for managing the knowledge base.""" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="purple")) as demo: | |
gr.Markdown("# π Knowledge Base Management") | |
gr.Markdown("Manage the local, encrypted knowledge base.") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Rebuild Knowledge Base") | |
rebuild_btn = gr.Button("Rebuild from Source Files") | |
rebuild_status = gr.Textbox(label="Status", interactive=False) | |
with gr.Column(): | |
gr.Markdown("### Add PDF to Knowledge Base") | |
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
ingest_btn = gr.Button("Ingest PDF") | |
ingest_status = gr.Textbox(label="Status", interactive=False) | |
def rebuild_kb(): | |
yield "Rebuilding knowledge base..." | |
try: | |
KB.rebuild_from_default_files() # Call the new method to rebuild from default files | |
yield "Knowledge base rebuilt successfully." | |
except Exception as e: | |
yield f"Error rebuilding knowledge base: {e}" | |
def ingest_pdf(pdf): | |
if pdf is None: | |
return "Please upload a PDF file." | |
yield "Ingesting PDF..." | |
try: | |
KB.ingest_pdf(pdf.name, os.path.basename(pdf.name)) | |
yield f"Successfully ingested {os.path.basename(pdf.name)}." | |
except Exception as e: | |
yield f"Error ingesting PDF: {e}" | |
rebuild_btn.click(rebuild_kb, outputs=[rebuild_status]) | |
ingest_btn.click(ingest_pdf, inputs=[pdf_input], outputs=[ingest_status]) | |
return demo | |
# --- Step 5: App Launcher --- | |
def check_internet_connection(host="8.8.8.8", port=53, timeout=3): | |
"""Check for internet connectivity.""" | |
try: | |
socket.setdefaulttimeout(timeout) | |
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port)) | |
return True | |
except socket.error: | |
return False | |
def _setup_gcp_credentials(): | |
gcp_credentials_json = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") | |
if gcp_credentials_json and gcp_credentials_json.strip().startswith("{"): | |
try: | |
credentials_dict = json.loads(gcp_credentials_json) | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.json') as temp_file: | |
json.dump(credentials_dict, temp_file) | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name | |
print(f"β GCP credentials set from env var to: {os.environ['GOOGLE_APPLICATION_CREDENTIALS']}") | |
except Exception as e: | |
print(f"β Error setting up GCP credentials: {e}") | |
else: | |
print("βΉοΈ GOOGLE_APPLICATION_CREDENTIALS not found as JSON string. Using other means.") | |
if __name__ == "__main__": | |
_setup_gcp_credentials() | |
with gr.Blocks(theme=gr.themes.Soft(), css="footer {visibility: hidden !important;}") as demo: | |
user_state = gr.State(None) # To hold user session info (uid, token, etc) | |
# --- Login UI --- | |
with gr.Column(visible=True) as login_view: | |
gr.Markdown("# Welcome to Aura Mind Glow", elem_id="login_title") | |
gr.Markdown("Please log in or sign up to continue.") | |
with gr.Row(): | |
email_input = gr.Textbox(label="Email", placeholder="Enter your email") | |
password_input = gr.Textbox(label="Password", type="password", placeholder="Enter your password") | |
with gr.Row(): | |
login_btn = gr.Button("Login") | |
signup_btn = gr.Button("Sign Up") | |
auth_feedback = gr.Markdown() | |
# --- Main Application UI (Initially Hidden) --- | |
with gr.Column(visible=False) as main_view: | |
gr.Markdown("## Aura Mind Glow Dashboard") | |
with gr.Row(): | |
logged_in_user_display = gr.Markdown() | |
logout_btn = gr.Button("Logout") | |
# Build the tabbed interface | |
interface_list = [] | |
tab_titles = [] | |
# Field Mode is always available after login | |
field_mode_ui = create_field_mode_ui(user_state) | |
interface_list.append(field_mode_ui) | |
tab_titles.append("Field Mode") | |
if check_internet_connection(): | |
if ADK_RUNNER: interface_list.append(create_connected_mode_ui(user_state)); tab_titles.append("Connected Mode") | |
if STORY_LLM: interface_list.append(create_story_mode_ui()); tab_titles.append("Farmer's Story") | |
interface_list.append(create_document_analysis_ui()); tab_titles.append("Doc Analysis") | |
interface_list.append(create_settings_ui()); tab_titles.append("Settings") | |
interface_list.append(create_kb_management_ui()); tab_titles.append("Knowledge Base") | |
else: | |
gr.Markdown("**Warning:** No internet connection. Some features are disabled.") | |
main_tabs = gr.TabbedInterface(interface_list, tab_titles) | |
# --- Event Handlers --- | |
def on_login_success(user_data): | |
"""Called when login is successful. Hides login UI, shows main UI.""" | |
if user_data: | |
return ( | |
gr.update(visible=False), # Hide login_view | |
gr.update(visible=True), # Show main_view | |
f"Logged in as: **{user_data['email']}**", # Update user display | |
"" | |
) | |
return gr.update(), gr.update(), gr.update(), "β Invalid email or password." | |
def on_logout(): | |
"""Called on logout. Hides main UI, shows login UI.""" | |
return None, gr.update(visible=True), gr.update(visible=False), "" | |
# Button and state change listeners | |
login_btn.click( | |
fn=login_user, | |
inputs=[email_input, password_input], | |
outputs=[user_state] | |
).then( | |
fn=on_login_success, | |
inputs=[user_state], | |
outputs=[login_view, main_view, logged_in_user_display, auth_feedback] | |
) | |
signup_btn.click( | |
fn=signup_user, | |
inputs=[email_input, password_input], | |
outputs=[auth_feedback] | |
) | |
logout_btn.click( | |
fn=on_logout, | |
inputs=[], | |
outputs=[user_state, login_view, main_view, logged_in_user_display] | |
) | |
demo.launch(share=True, debug=True, mcp_server=True) |