# ============================================================================== # Aura Mind Glow - Main Application (Refactored) # ============================================================================== """ This script launches the Aura Mind Glow application, now with multiple modes. """ # --- Step 0: Essential Imports --- import gradio as gr from PIL import Image import os import warnings import socket import tempfile import json # New import import re from google.cloud import bigquery # Suppress potential warnings for a cleaner console warnings.filterwarnings("ignore") os.environ["TORCH_COMPILE_DISABLE"] = "1" # Ensure torch compile is off # --- Step 1: Import Core Components from Modules --- from vision_model import load_vision_model from knowledge_base import KnowledgeBase from agent_setup import initialize_adk from google.genai import types from story_generator import create_story_prompt_from_pdf, generate_video_from_prompt from langchain_huggingface import HuggingFaceEndpoint from bigquery_uploader import upload_diagnosis_to_bigquery print("✅ All libraries imported successfully.") # --- Step 2: Global Initialization --- # This expensive setup runs only ONCE when the application starts. print("Performing initial setup...") VISION_MODEL, PROCESSOR = load_vision_model() KB = KnowledgeBase() RETRIEVER = KB # The retriever is now the KB itself # Initialize ADK components for Connected Mode adk_components = initialize_adk(VISION_MODEL, PROCESSOR, RETRIEVER) ADK_RUNNER = adk_components["runner"] if adk_components else None DIAGNOSIS_TOOL = adk_components["diagnosis_tool"] if adk_components else None REMEDY_TOOL = adk_components["remedy_tool"] if adk_components else None SESSION_SERVICE = adk_components["session_service"] if adk_components else None # Initialize a separate LLM for the Story Generator STORY_LLM = None if os.environ.get("HF_TOKEN"): try: STORY_LLM = HuggingFaceEndpoint( repo_id="HuggingFaceH4/zephyr-7b-beta", huggingfacehub_api_token=os.environ.get("HF_TOKEN"), max_new_tokens=150, temperature=0.4, ) print("✅ Story Generator LLM initialized successfully.") except Exception as e: print(f"❌ Could not initialize Story Generator LLM: {e}") else: print("❌ HF_TOKEN not found. Story Generator Mode will be disabled.") # --- Step 3: Define Gradio UIs --- def create_field_mode_ui(): """Creates the Gradio UI for the offline Field Mode.""" def clean_diagnosis_text(diagnosis: str) -> str: """ Cleans the diagnosis text by removing special characters and extra whitespace. This function is designed to prepare the text for a BigQuery SEARCH query. """ # Remove special characters, keeping only letters, numbers, and basic punctuation. # This will remove characters like *, #, etc. cleaned_text = re.sub(r'[^\w\s.\-,"]', '', diagnosis) # Replace multiple whitespace characters with a single space cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() return cleaned_text def search_bigquery_for_remedy(search_query: str) -> str: """ Searches the BigQuery table for a remedy using the SEARCH function. """ try: # Initialize the BigQuery client. Your project ID is used here. client = bigquery.Client(project="gem-creation") # Define the SQL query with the SEARCH function and a parameter for safety. query = """ SELECT remedy_description FROM `gem-creation.maize_remedies.remedies` WHERE SEARCH(remedy_description, @query) """ # Set up the query parameters. job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter("query", "STRING", search_query), ] ) # Execute the query. print(f"Executing BigQuery search for: '{search_query}'") query_job = client.query(query, job_config=job_config) # Process the results. results = list(query_job) # Get all results if not results: print("No remedy found in BigQuery.") return "No remedy found in the cloud knowledge base for this condition." else: # The result is a Row object; access the column by its name. remedy = results[0].remedy_description print("Remedy found in BigQuery.") return remedy except Exception as e: error_message = f"An error occurred while querying the BigQuery database: {e}" print(f"❌ {error_message}") return error_message def get_diagnosis_and_remedy(uploaded_image: Image.Image) -> str: if uploaded_image is None: return "Please upload an image of a maize plant first." if RETRIEVER is None: raise gr.Error("Knowledge base is not loaded. Cannot find remedy. Check logs.") temp_file_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: uploaded_image.save(temp_file.name) temp_file_path = temp_file.name diagnosis = DIAGNOSIS_TOOL(temp_file_path) print(f"Diagnosis received: {diagnosis}") if "Could not parse" in diagnosis: return f"Sorry, I couldn't identify the condition from the image. Raw output: {diagnosis}" # Clean the diagnosis text before using it as a search query report_title = diagnosis # Keep the original diagnosis for the report cleaned_diagnosis = clean_diagnosis_text(diagnosis) # Determine the search query based on keywords if "healthy" in cleaned_diagnosis.lower(): search_query = "healthy maize" elif "phosphorus" in cleaned_diagnosis.lower(): search_query = "phosphorus" else: search_query = "Wetin My Eye See So" results = search_bigquery_for_remedy(search_query) if not results: remedy = "No remedy found in the local knowledge base." else: remedy = results final_response = f""" ## Diagnosis Report **Condition Identified:** ### {report_title} --- ## Suggested Remedy {remedy} """ print("Workflow complete. Returning response.") # Prepare data for BigQuery upload diagnosis_data = { "ai_diagnosis": report_title, "recommended_action": remedy, "confidence_score": None, # Placeholder, as confidence score is not calculated here "farmer_id": "unknown", # Placeholder "gps_latitude": None, # Placeholder "gps_longitude": None, # Placeholder "crop_type": "Maize", # Assuming maize for this app "crop_variety": None, # Placeholder "farmer_feedback": None, # Placeholder "treatment_applied": None, # Placeholder "outcome_image_id": None, # Placeholder } upload_diagnosis_to_bigquery(diagnosis_data) return final_response except Exception as e: print(f"An error occurred during the analysis workflow: {e}") raise gr.Error(f"An unexpected error occurred: {e}") finally: if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) # ... (the rest of your Gradio Interface definition remains the same) css = """ footer {visibility: hidden !important;} .gradio-container {font-family: 'IBM Plex Sans', sans-serif;} """ return gr.Interface( fn=get_diagnosis_and_remedy, inputs=gr.Image(type="pil", label="Upload Maize Plant Image", sources=["upload", "webcam"]), outputs=gr.Markdown(label="Diagnosis and Remedy Report", value="The report will appear here..."), title="🌽 Aura Mind Glow: Field Mode (Offline)", description="**A 100% Offline-Capable Farming Assistant.** Upload an image of a maize plant. The AI will diagnose its condition and retrieve a treatment plan from its local knowledge base.", article="

Built with Unsloth, LangChain, and Gradio. Version 2.1

", allow_flagging="never", theme=gr.themes.Soft(primary_hue="teal", secondary_hue="orange"), css=css ) def create_connected_mode_ui(): """Creates the Gradio UI for the online Connected Mode.""" # ... (This function remains unchanged) ... with gr.Blocks(theme=gr.themes.Soft(primary_hue="green", secondary_hue="lime")) as demo: gr.Markdown("# 🌽 Aura Mind Glow: Connected Mode 🤖") gr.Markdown("I am an AI farming assistant. Upload an image and ask for a diagnosis and remedy.") chatbot = gr.Chatbot(height=600) msg = gr.MultimodalTextbox(file_types=["image"], label="Ask a question and/or upload an image...") async def respond(chat_input, history): user_id = "default_user" # In a real app, this would be unique per user if not SESSION_SERVICE or not ADK_RUNNER: history.append((chat_input["text"], "Connected mode is not available. Check logs.")) yield history, gr.MultimodalTextbox(value=None) return session = await SESSION_SERVICE.create_session(user_id=user_id, app_name="AuraMindGlow") files = chat_input.get("files", []) text = chat_input.get("text", "") if not files: history.append([text, "Please upload an image for diagnosis."]) yield history, gr.MultimodalTextbox(value=None) return # Create the prompt for the ADK agent with open(files[0], 'rb') as f: image_data = f.read() image_part = types.Part( inline_data=types.Blob( mime_type='image/png', data=image_data ) ) text_part = types.Part(text=text or "Diagnose this plant and provide a remedy.") prompt = types.Content(parts=[image_part, text_part], role="user") # Stream the response from the agent bot_message = "" history.append([(files[0],), bot_message]) try: async for event in ADK_RUNNER.run_async( session_id=session.id, user_id=user_id, new_message=prompt ): if event.is_llm_response_chunk() and event.content.parts: bot_message += event.content.parts[0].text history[-1] = ((files[0],), bot_message) yield history, gr.MultimodalTextbox(value=None) elif event.is_final_response() and event.content.parts: bot_message = event.content.parts[0].text history[-1] = ((files[0],), bot_message) yield history, gr.MultimodalTextbox(value=None) except Exception as e: print(f"Error during agent execution: {e}") history[-1] = ((files[0],), f"An error occurred: {e}") yield history, gr.MultimodalTextbox(value=None) msg.submit(respond, [msg, chatbot], [chatbot, msg]) return demo def create_document_analysis_ui(): """Creates the Gradio UI for the Document Analysis Mode.""" with gr.Blocks(theme=gr.themes.Soft(primary_hue="purple", secondary_hue="pink")) as demo: gr.Markdown("# 🌽 Aura Mind Glow: Document Analysis Mode 📄") gr.Markdown("Upload a PDF or a spreadsheet and ask questions about its content.") with gr.Row(): with gr.Column(scale=1): doc_input = gr.File(label="Upload Document", file_types=[".pdf", ".csv"]) query_input = gr.Textbox(label="Ask a question about the document") submit_btn = gr.Button("Analyze and Query") with gr.Column(scale=2): answer_output = gr.Textbox(label="Answer", interactive=False, lines=10) status_output = gr.Textbox(label="Status", interactive=False, lines=3) def analysis_process(doc, query): if doc is None: yield "Please upload a document to begin.", "" return file_path = doc.name file_ext = os.path.splitext(file_path)[1].lower() if file_ext == ".pdf": yield "Analyzing PDF...", "" chain, vector_store = analyze_pdf(file_path) if chain and vector_store: yield "PDF analyzed successfully. Now querying...", "" answer = query_pdf(chain, vector_store, query) yield answer, "Query complete." else: yield "Failed to analyze PDF.", "Error" elif file_ext == ".csv": yield "Analyzing spreadsheet...", "" agent = analyze_spreadsheet(file_path) if agent: yield "Spreadsheet analyzed successfully. Now querying...", "" answer = query_spreadsheet(agent, query) yield answer, "Query complete." else: yield "Failed to analyze spreadsheet.", "Error" else: yield "Unsupported file type. Please upload a PDF or a CSV file.", "Error" submit_btn.click( analysis_process, inputs=[doc_input, query_input], outputs=[answer_output, status_output] ) return demo def create_story_mode_ui(): """Creates the Gradio UI for the Farmer's Story Mode.""" with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="yellow")) as demo: gr.Markdown("# 🌽 Aura Mind Glow: Farmer's Story Mode 🎬") gr.Markdown("Create a short video story from your farm documents. Upload a PDF, describe the mood, and let the AI create a visual story.") with gr.Row(): with gr.Column(scale=1): pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) image_input = gr.Image(type="filepath", label="Optional: Upload a Starting Image") user_prompt_input = gr.Textbox(label="Describe the video's tone or theme", placeholder="e.g., hopeful, a look back at a tough season, etc.") submit_btn = gr.Button("Generate Video Story") with gr.Column(scale=2): video_output = gr.Video(label="Generated Video Story") status_output = gr.Textbox(label="Status", interactive=False, lines=3) def story_generation_process(pdf, image, user_prompt): if pdf is None: yield None, "Please upload a PDF document to begin." return yield None, "Step 1: Reading PDF and generating creative prompt..." creative_prompt = create_story_prompt_from_pdf(pdf.name, user_prompt, STORY_LLM) if "Error" in creative_prompt: yield None, creative_prompt return yield None, f"Step 2: Generating video with prompt: '{creative_prompt[:100]}...' (This may take several minutes)" video_path = generate_video_from_prompt(creative_prompt, image) if "Error" in video_path: yield None, video_path return yield video_path, "Video generation complete!" submit_btn.click( story_generation_process, inputs=[pdf_input, image_input, user_prompt_input], outputs=[video_output, status_output] ) return demo def create_settings_ui(): """Creates the Gradio UI for the Settings tab.""" with gr.Blocks(theme=gr.themes.Soft(primary_hue="gray", secondary_hue="blue")) as demo: gr.Markdown("# ⚙️ Settings & Data Management") gr.Markdown("Manage application settings and data synchronization.") with gr.Row(): with gr.Column(): sync_btn = gr.Button("☁️ Sync Local Data to BigQuery Cloud") status_output = gr.Textbox(label="Sync Status", interactive=False, lines=5) def sync_data_to_cloud(): yield "Local data sync is no longer required as diagnoses are uploaded directly to BigQuery." sync_btn.click( sync_data_to_cloud, inputs=[], outputs=[status_output] ) return demo def create_kb_management_ui(): """Creates the Gradio UI for managing the knowledge base.""" with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="purple")) as demo: gr.Markdown("# 📚 Knowledge Base Management") gr.Markdown("Manage the local, encrypted knowledge base.") with gr.Row(): with gr.Column(): gr.Markdown("### Rebuild Knowledge Base") rebuild_btn = gr.Button("Rebuild from Source Files") rebuild_status = gr.Textbox(label="Status", interactive=False) with gr.Column(): gr.Markdown("### Add PDF to Knowledge Base") pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) ingest_btn = gr.Button("Ingest PDF") ingest_status = gr.Textbox(label="Status", interactive=False) def rebuild_kb(): yield "Rebuilding knowledge base..." try: KB.rebuild_from_default_files() # Call the new method to rebuild from default files yield "Knowledge base rebuilt successfully." except Exception as e: yield f"Error rebuilding knowledge base: {e}" def ingest_pdf(pdf): if pdf is None: return "Please upload a PDF file." yield "Ingesting PDF..." try: KB.ingest_pdf(pdf.name, os.path.basename(pdf.name)) yield f"Successfully ingested {os.path.basename(pdf.name)}." except Exception as e: yield f"Error ingesting PDF: {e}" rebuild_btn.click(rebuild_kb, outputs=[rebuild_status]) ingest_btn.click(ingest_pdf, inputs=[pdf_input], outputs=[ingest_status]) return demo # --- Step 4: App Launcher --- def check_internet_connection(host="8.8.8.8", port=53, timeout=3): """Check for internet connectivity.""" try: socket.setdefaulttimeout(timeout) socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port)) return True except socket.error: return False def _setup_gcp_credentials(): """ Sets up Google Cloud credentials from an environment variable. If GOOGLE_APPLICATION_CREDENTIALS contains a JSON string, it writes it to a temporary file and updates the environment variable to point to that file. This is useful for environments like Hugging Face Spaces where secrets are provided as string values. """ gcp_credentials_json = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") if gcp_credentials_json and gcp_credentials_json.strip().startswith("{"): try: # Validate JSON credentials_dict = json.loads(gcp_credentials_json) # Create a temporary file to store the credentials temp_dir = tempfile.gettempdir() temp_file_path = os.path.join(temp_dir, "gcp_credentials.json") with open(temp_file_path, "w") as f: json.dump(credentials_dict, f) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file_path print(f"✅ Google Cloud credentials successfully set from environment variable to: {temp_file_path}") except json.JSONDecodeError as e: print(f"❌ Error decoding GOOGLE_APPLICATION_CREDENTIALS JSON: {e}") except Exception as e: print(f"❌ An unexpected error occurred while setting up GCP credentials: {e}") else: print("ℹ️ GOOGLE_APPLICATION_CREDENTIALS environment variable not found or not a JSON string. Assuming credentials are set via other means (e.g., gcloud CLI, default ADC).") if __name__ == "__main__": _setup_gcp_credentials() # No local database initialization needed field_mode_ui = create_field_mode_ui() interface_list = [field_mode_ui] tab_titles = ["Field Mode (Offline)"] # Conditionally add modes that require an internet connection if check_internet_connection(): if ADK_RUNNER: connected_mode_ui = create_connected_mode_ui() interface_list.append(connected_mode_ui) tab_titles.append("Connected Mode") else: print("⚠️ Connected Mode disabled: ADK components not initialized.") if STORY_LLM: story_mode_ui = create_story_mode_ui() interface_list.append(story_mode_ui) tab_titles.append("Farmer's Story Mode") else: print("⚠️ Farmer's Story Mode disabled: Story LLM not initialized.") # Add the new Document Analysis UI document_analysis_ui = create_document_analysis_ui() interface_list.append(document_analysis_ui) tab_titles.append("Document Analysis") # Add the Settings UI settings_ui = create_settings_ui() interface_list.append(settings_ui) tab_titles.append("Settings") # Add the Knowledge Base Management UI kb_management_ui = create_kb_management_ui() interface_list.append(kb_management_ui) tab_titles.append("Knowledge Base") else: print("❌ No internet connection. Launching in Offline Mode only.") # Launch the appropriate UI if len(interface_list) > 1: ui = gr.TabbedInterface(interface_list, tab_titles) else: ui = field_mode_ui ui.launch(share=True, debug=True)