Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| from omagent_core.clients.devices.app.schemas import (CodeEnum, ContentStatus, | |
| InteractionType, | |
| MessageType) | |
| from omagent_core.clients.input_base import InputBase | |
| from omagent_core.engine.configuration.configuration import Configuration | |
| from omagent_core.engine.http.models.workflow_status import running_status | |
| from omagent_core.engine.orkes.orkes_workflow_client import ( | |
| OrkesWorkflowClient, workflow_client) | |
| from omagent_core.services.connectors.redis import RedisConnector | |
| from omagent_core.utils import registry | |
| from omagent_core.utils.container import container | |
| from omagent_core.utils.general import read_image | |
| from omagent_core.utils.logger import logging | |
| from omagent_core.utils.registry import registry | |
| import os | |
| class AppInput(InputBase): | |
| redis_stream_client: RedisConnector | |
| def read_input(self, workflow_instance_id: str, input_prompt=""): | |
| stream_name = f"{workflow_instance_id}_input" | |
| group_name = "omappagent" # consumer group name | |
| consumer_name = f"{workflow_instance_id}_agent" # consumer name | |
| poll_interval: int = 1 | |
| if input_prompt is not None: | |
| start_id = self._send_input_message(workflow_instance_id, input_prompt) | |
| else: | |
| current_timestamp = int(time.time() * 1000) | |
| start_id = f"{current_timestamp}-0" | |
| result = {} | |
| # ensure consumer group exists | |
| try: | |
| self.redis_stream_client._client.xgroup_create( | |
| stream_name, group_name, id="0", mkstream=True | |
| ) | |
| except Exception as e: | |
| logging.debug(f"Consumer group may already exist: {e}") | |
| if not os.getenv("OMAGENT_MODE") == "lite": | |
| logging.info( | |
| f"Listening to Redis stream: {stream_name} in group: {group_name} start_id: {start_id}" | |
| ) | |
| data_flag = False | |
| while True: | |
| try: | |
| # logging.info(f"Checking workflow status: {workflow_instance_id}") | |
| workflow_status = workflow_client.get_workflow_status( | |
| workflow_instance_id | |
| ) | |
| if workflow_status.status not in running_status: | |
| logging.info( | |
| f"Workflow {workflow_instance_id} is not running, exiting..." | |
| ) | |
| break | |
| # read new messages from redis stream | |
| messages = self.redis_stream_client._client.xrevrange( | |
| stream_name, max="+", min=start_id, count=1 | |
| ) | |
| # Convert byte data to string | |
| messages = [ | |
| ( | |
| message_id, | |
| { | |
| k.decode("utf-8"): v.decode("utf-8") | |
| for k, v in message.items() | |
| }, | |
| ) | |
| for message_id, message in messages | |
| ] | |
| # logging.info(f"Messages: {messages}") | |
| for message_id, message in messages: | |
| data_flag = self.process_message(message, result) | |
| if data_flag: | |
| break | |
| # Sleep for the specified interval before checking for new messages again | |
| # logging.info(f"Sleeping for {poll_interval} seconds, waiting for {stream_name} ...") | |
| time.sleep(poll_interval) | |
| except Exception as e: | |
| logging.error(f"Error while listening to stream: {e}") | |
| time.sleep(poll_interval) # Wait before retrying | |
| return result | |
| def process_message(self, message, result): | |
| logging.info(f"Received message: {message}") | |
| try: | |
| payload = message.get("payload") | |
| """ | |
| { | |
| "agent_id": "string", | |
| "messages": [ | |
| { | |
| "role": "string", | |
| "content": [ | |
| { | |
| "type": "string", | |
| "data": "string" | |
| } | |
| ] | |
| } | |
| ], | |
| "kwargs": {} | |
| } | |
| """ | |
| # check payload data | |
| if not payload: | |
| logging.error("Payload is empty") | |
| return False | |
| try: | |
| payload_data = json.loads(payload) | |
| except json.JSONDecodeError as e: | |
| logging.error(f"Payload is not a valid JSON: {e}") | |
| return False | |
| if "agent_id" not in payload_data: | |
| logging.error("Payload does not contain 'agent_id' key") | |
| return False | |
| if "messages" not in payload_data: | |
| logging.error("Payload does not contain 'messages' key") | |
| return False | |
| if not isinstance(payload_data["messages"], list): | |
| logging.error("'messages' should be a list") | |
| return False | |
| for message in payload_data["messages"]: | |
| if not isinstance(message, dict): | |
| logging.error("Each item in 'messages' should be a dictionary") | |
| return False | |
| if "role" not in message or "content" not in message: | |
| logging.error( | |
| "Each item in 'messages' should contain 'role' and 'content' keys" | |
| ) | |
| return False | |
| if not isinstance(message["content"], list): | |
| logging.error("'content' should be a list") | |
| return False | |
| for content in message["content"]: | |
| if not isinstance(content, dict): | |
| logging.error("Each item in 'content' should be a dictionary") | |
| return False | |
| if "type" not in content or "data" not in content: | |
| logging.error( | |
| "Each item in 'content' should contain 'type' and 'data' keys" | |
| ) | |
| return False | |
| message_data = json.loads(payload) | |
| result.update(message_data) | |
| except Exception as e: | |
| logging.error(f"Error processing message: {e}") | |
| return False | |
| return True | |
| def _send_input_message(self, agent_id, msg): | |
| message_id = self._send_base_message( | |
| agent_id, | |
| CodeEnum.SUCCESS.value, | |
| "", | |
| 0, | |
| MessageType.TEXT.value, | |
| msg, | |
| ContentStatus.END_BLOCK.value, | |
| InteractionType.INPUT.value, | |
| 0, | |
| 0, | |
| ) | |
| return message_id | |
| def _create_message_data( | |
| self, | |
| agent_id, | |
| code, | |
| error_info, | |
| took, | |
| msg_type, | |
| msg, | |
| content_status, | |
| interaction_type, | |
| prompt_tokens, | |
| output_tokens, | |
| ): | |
| message = {"role": "assistant", "type": msg_type, "content": msg} | |
| usage = {"prompt_tokens": prompt_tokens, "output_tokens": output_tokens} | |
| data = { | |
| "agent_id": agent_id, | |
| "code": code, | |
| "error_info": error_info, | |
| "took": took, | |
| "content_status": content_status, | |
| "interaction_type": int(interaction_type), | |
| "message": message, | |
| "usage": usage, | |
| } | |
| return {"payload": json.dumps(data, ensure_ascii=False)} | |
| def _send_to_group(self, stream_name, group_name, data): | |
| logging.info(f"Stream: {stream_name}, Group: {group_name}, Data: {data}") | |
| message_id = self.redis_stream_client._client.xadd(stream_name, data) | |
| try: | |
| self.redis_stream_client._client.xgroup_create( | |
| stream_name, group_name, id="0" | |
| ) | |
| except Exception as e: | |
| logging.debug(f"Consumer group may already exist: {e}") | |
| return message_id | |
| def _send_base_message( | |
| self, | |
| agent_id, | |
| code, | |
| error_info, | |
| took, | |
| msg_type, | |
| msg, | |
| content_status, | |
| interaction_type, | |
| prompt_tokens, | |
| output_tokens, | |
| ): | |
| stream_name = f"{agent_id}_output" | |
| group_name = "omappagent" # replace with your consumer group name | |
| data = self._create_message_data( | |
| agent_id, | |
| code, | |
| error_info, | |
| took, | |
| msg_type, | |
| msg, | |
| content_status, | |
| interaction_type, | |
| prompt_tokens, | |
| output_tokens, | |
| ) | |
| message_id = self._send_to_group(stream_name, group_name, data) | |
| return message_id | |