Spaces:
Runtime error
Runtime error
| """ | |
| Advanced Agentic System Interface | |
| ------------------------------- | |
| Provides a chat interface to interact with the autonomous agent teams: | |
| - Team A: Coders (App/Software Developers) | |
| - Team B: Business (Entrepreneurs) | |
| - Team C: Research (Deep Online Research) | |
| - Team D: Crypto & Sports Trading | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| from typing import Dict, Any, List | |
| import json | |
| from datetime import datetime | |
| from agentic_system import AgenticSystem | |
| from team_management import TeamManager, TeamType, TeamObjective | |
| from orchestrator import AgentOrchestrator | |
| from reasoning import ReasoningEngine | |
| class ChatInterface: | |
| def __init__(self): | |
| # Initialize core components | |
| self.orchestrator = AgentOrchestrator() | |
| self.agentic_system = AgenticSystem() | |
| self.team_manager = TeamManager(self.orchestrator) | |
| self.chat_history = [] | |
| self.active_objectives = {} | |
| # Initialize teams | |
| asyncio.run(self.team_manager.initialize_team_agents()) | |
| async def process_message( | |
| self, | |
| message: str, | |
| history: List[List[str]] | |
| ) -> str: | |
| """Process incoming chat message.""" | |
| try: | |
| # Analyze message intent | |
| intent = await self._analyze_intent(message) | |
| if intent["type"] == "query": | |
| response = await self._handle_query(message) | |
| elif intent["type"] == "objective": | |
| response = await self._handle_objective(message) | |
| elif intent["type"] == "status": | |
| response = await self._handle_status_request(message) | |
| else: | |
| response = await self._handle_general_chat(message) | |
| # Update chat history | |
| self.chat_history.append({ | |
| "role": "user", | |
| "content": message, | |
| "timestamp": datetime.now() | |
| }) | |
| self.chat_history.append({ | |
| "role": "assistant", | |
| "content": response, | |
| "timestamp": datetime.now() | |
| }) | |
| return response | |
| except Exception as e: | |
| return f"Error processing message: {str(e)}" | |
| async def _analyze_intent(self, message: str) -> Dict[str, Any]: | |
| """Analyze user message intent.""" | |
| # Use reasoning engine to analyze intent | |
| analysis = await self.orchestrator.reasoning_engine.reason( | |
| query=message, | |
| context={ | |
| "chat_history": self.chat_history, | |
| "active_objectives": self.active_objectives | |
| } | |
| ) | |
| return { | |
| "type": analysis.get("intent_type", "general"), | |
| "confidence": analysis.get("confidence", 0.5), | |
| "entities": analysis.get("entities", []), | |
| "action_required": analysis.get("action_required", False) | |
| } | |
| async def _handle_query(self, message: str) -> str: | |
| """Handle information queries.""" | |
| # Get relevant teams for the query | |
| recommended_teams = await self.team_manager.get_team_recommendations(message) | |
| # Get responses from relevant teams | |
| responses = [] | |
| for team_type in recommended_teams: | |
| team_response = await self._get_team_response(team_type, message) | |
| responses.append(team_response) | |
| # Combine and format responses | |
| combined_response = self._format_team_responses(responses) | |
| return combined_response | |
| async def _handle_objective(self, message: str) -> str: | |
| """Handle new objective creation.""" | |
| # Analyze objective requirements | |
| analysis = await self.orchestrator.reasoning_engine.reason( | |
| query=f"Analyze objective requirements: {message}", | |
| context={"teams": self.team_manager.teams} | |
| ) | |
| # Determine required teams | |
| required_teams = [ | |
| TeamType[team.upper()] | |
| for team in analysis.get("required_teams", []) | |
| ] | |
| # Create cross-team objective | |
| objective_id = await self.team_manager.create_cross_team_objective( | |
| objective=message, | |
| required_teams=required_teams | |
| ) | |
| self.active_objectives[objective_id] = { | |
| "description": message, | |
| "teams": required_teams, | |
| "status": "initiated", | |
| "created_at": datetime.now() | |
| } | |
| return self._format_objective_creation(objective_id) | |
| async def _handle_status_request(self, message: str) -> str: | |
| """Handle status check requests.""" | |
| # Get system status | |
| system_status = await self.agentic_system.get_system_status() | |
| # Get team status | |
| team_status = {} | |
| for team_id, team in self.team_manager.teams.items(): | |
| team_status[team.name] = await self.team_manager.monitor_objective_progress(team_id) | |
| # Get objective status | |
| objective_status = {} | |
| for obj_id, obj in self.active_objectives.items(): | |
| objective_status[obj_id] = await self.team_manager.monitor_objective_progress(obj_id) | |
| return self._format_status_response(system_status, team_status, objective_status) | |
| async def _handle_general_chat(self, message: str) -> str: | |
| """Handle general chat interactions.""" | |
| # Use reasoning engine for response generation | |
| response = await self.orchestrator.reasoning_engine.reason( | |
| query=message, | |
| context={ | |
| "chat_history": self.chat_history, | |
| "system_state": await self.agentic_system.get_system_status() | |
| } | |
| ) | |
| return response.get("response", "I'm not sure how to respond to that.") | |
| async def _get_team_response(self, team_type: TeamType, query: str) -> Dict[str, Any]: | |
| """Get response from a specific team.""" | |
| team_id = next( | |
| (tid for tid, team in self.team_manager.teams.items() | |
| if team.type == team_type), | |
| None | |
| ) | |
| if not team_id: | |
| return { | |
| "team": team_type.value, | |
| "response": "Team not available", | |
| "confidence": 0.0 | |
| } | |
| # Get team agents | |
| team_agents = self.team_manager.agents[team_id] | |
| # Aggregate responses from team agents | |
| responses = [] | |
| for agent in team_agents.values(): | |
| agent_response = await agent.process_query(query) | |
| responses.append(agent_response) | |
| # Combine responses | |
| combined_response = self._combine_agent_responses(responses) | |
| return { | |
| "team": team_type.value, | |
| "response": combined_response, | |
| "confidence": sum(r.get("confidence", 0) for r in responses) / len(responses) | |
| } | |
| def _combine_agent_responses(self, responses: List[Dict[str, Any]]) -> str: | |
| """Combine multiple agent responses into a coherent response.""" | |
| # Sort by confidence | |
| valid_responses = [ | |
| r for r in responses | |
| if r.get("success", False) and r.get("response") | |
| ] | |
| if not valid_responses: | |
| return "No valid response available" | |
| sorted_responses = sorted( | |
| valid_responses, | |
| key=lambda x: x.get("confidence", 0), | |
| reverse=True | |
| ) | |
| # Take the highest confidence response | |
| best_response = sorted_responses[0] | |
| return best_response.get("response", "No response available") | |
| def _format_team_responses(self, responses: List[Dict[str, Any]]) -> str: | |
| """Format team responses into a readable message.""" | |
| formatted = [] | |
| for response in responses: | |
| if response.get("confidence", 0) > 0.3: # Confidence threshold | |
| formatted.append( | |
| f"Team {response['team'].title()}:\n" | |
| f"{response['response']}\n" | |
| ) | |
| if not formatted: | |
| return "No team was able to provide a confident response." | |
| return "\n".join(formatted) | |
| def _format_objective_creation(self, objective_id: str) -> str: | |
| """Format objective creation response.""" | |
| objective = self.active_objectives[objective_id] | |
| return ( | |
| f"Objective created successfully!\n\n" | |
| f"Objective ID: {objective_id}\n" | |
| f"Description: {objective['description']}\n" | |
| f"Assigned Teams: {', '.join(t.value for t in objective['teams'])}\n" | |
| f"Status: {objective['status']}\n" | |
| f"Created: {objective['created_at'].strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| def _format_status_response( | |
| self, | |
| system_status: Dict[str, Any], | |
| team_status: Dict[str, Any], | |
| objective_status: Dict[str, Any] | |
| ) -> str: | |
| """Format status response.""" | |
| # Format system status | |
| status = [ | |
| "System Status:", | |
| f"- State: {system_status['state']}", | |
| f"- Active Agents: {system_status['agent_count']}", | |
| f"- Active Tasks: {system_status['active_tasks']}", | |
| "\nTeam Status:" | |
| ] | |
| # Add team status | |
| for team_name, team_info in team_status.items(): | |
| status.extend([ | |
| f"\n{team_name}:", | |
| f"- Active Agents: {team_info['active_agents']}", | |
| f"- Completion Rate: {team_info['completion_rate']:.2%}", | |
| f"- Collaboration Score: {team_info['collaboration_score']:.2f}" | |
| ]) | |
| # Add objective status | |
| if objective_status: | |
| status.append("\nActive Objectives:") | |
| for obj_id, obj_info in objective_status.items(): | |
| obj = self.active_objectives[obj_id] | |
| status.extend([ | |
| f"\n{obj['description']}:", | |
| f"- Status: {obj['status']}", | |
| f"- Teams: {', '.join(t.value for t in obj['teams'])}", | |
| f"- Progress: {sum(t['completion_rate'] for t in obj_info.values())/len(obj_info):.2%}" | |
| ]) | |
| return "\n".join(status) | |
| class VentureUI: | |
| def __init__(self, app): | |
| self.app = app | |
| def create_interface(self): | |
| return gr.Interface( | |
| fn=self.app, | |
| inputs=[ | |
| gr.Textbox( | |
| label="Message", | |
| placeholder="Chat with the Agentic System...", | |
| lines=2 | |
| ), | |
| gr.State([]) # For chat history | |
| ], | |
| outputs=gr.Textbox( | |
| label="Response", | |
| lines=10 | |
| ), | |
| title="Advanced Agentic System Chat Interface", | |
| description=""" | |
| Chat with our autonomous agent teams: | |
| - Team A: Coders (App/Software Developers) | |
| - Team B: Business (Entrepreneurs) | |
| - Team C: Research (Deep Online Research) | |
| - Team D: Crypto & Sports Trading | |
| You can: | |
| 1. Ask questions | |
| 2. Create new objectives | |
| 3. Check status of teams and objectives | |
| 4. Get insights and recommendations | |
| """, | |
| theme="default", | |
| allow_flagging="never" | |
| ) | |
| def create_chat_interface() -> gr.Interface: | |
| """Create Gradio chat interface.""" | |
| chat = ChatInterface() | |
| ui = VentureUI(chat.process_message) | |
| return ui.create_interface() | |
| # Create and launch the interface | |
| interface = create_chat_interface() | |
| if __name__ == "__main__": | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) | |