Spaces:
Runtime error
Runtime error
| import json | |
| import ray | |
| import time | |
| import asyncio | |
| import os | |
| from clip_transform import CLIPTransform | |
| from environment_state_actor import EnvironmentStateActor, EnvironmentState | |
| from respond_to_prompt_async import RespondToPromptAsync | |
| import asyncio | |
| import subprocess | |
| class CharlesActor: | |
| def __init__(self): | |
| self._needs_init = True | |
| self._charles_actor_debug_output = "" | |
| self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0) # Initialize as EnvironmentState | |
| self._state = "Initializing" | |
| self._clip_transform = CLIPTransform() | |
| def get_environment_state(self)->EnvironmentState: | |
| return self._environment_state | |
| def set_state(self, state, skip_print=False): | |
| self._state = state | |
| if not skip_print: | |
| print(state) | |
| # check if self._app_interface_actor exists | |
| if hasattr(self, '_app_interface_actor'): | |
| self._app_interface_actor.set_state.remote(self._state) | |
| async def _initalize_resources(self): | |
| # Initialize resources | |
| self.set_state("001 - creating AppInterfaceActor") | |
| from app_interface_actor import AppInterfaceActor | |
| self._app_interface_actor = AppInterfaceActor.get_singleton() | |
| self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote() | |
| self.set_state("002 - creating EnvironmentStateActor") | |
| self._environment_state_actor = EnvironmentStateActor.remote() | |
| self.set_state("003 - creating EnvironmentStateActor") | |
| from prompt_manager import PromptManager | |
| self._prompt_manager = PromptManager() | |
| self.set_state("004 - creating RespondToPromptAsync") | |
| self._respond_to_prompt = None | |
| self._respond_to_prompt_task = None | |
| self.set_state("005 - create SpeechToTextVoskActor") | |
| from speech_to_text_vosk_actor import SpeechToTextVoskActor | |
| self._speech_to_text_actor = SpeechToTextVoskActor.remote("small") | |
| # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big") | |
| self.set_state("006 - create Prototypes") | |
| from prototypes import Prototypes | |
| self._prototypes = Prototypes() | |
| self.set_state("007 - create animator") | |
| from charles_animator import CharlesAnimator | |
| self._animator = CharlesAnimator() | |
| self._needs_init = True | |
| self.set_state("010 - Initialized") | |
| async def start(self): | |
| if self._needs_init: | |
| await self._initalize_resources() | |
| debug_output_history = [] | |
| async def render_debug_output(list_of_strings): | |
| table_content = "##### Chat history\n" | |
| for item in reversed(list_of_strings): | |
| # table_content += f"\n```markdown\n{item}\n```\n" | |
| table_content += f"\n{item}\n" | |
| self._charles_actor_debug_output = table_content | |
| await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output) | |
| async def add_debug_output(output): | |
| debug_output_history.append(output) | |
| if len(debug_output_history) > 10: | |
| debug_output_history.pop(0) | |
| await render_debug_output(debug_output_history) | |
| self.set_state("Waiting for input") | |
| total_video_frames = 0 | |
| skipped_video_frames = 0 | |
| total_audio_frames = 0 | |
| loops = 0 | |
| start_time = time.time() | |
| vector_debug = "--n/a--" | |
| process_speech_to_text_future = [] | |
| current_responses = [] | |
| speech_chunks_per_response = [] | |
| human_preview_text = "" | |
| robot_preview_text = "" | |
| additional_prompt = None | |
| previous_prompt = "" | |
| is_talking = False | |
| has_spoken_for_this_prompt = False | |
| while True: | |
| env_state = await self._environment_state_actor.begin_next_step.remote() | |
| self._environment_state = env_state | |
| audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote() | |
| video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote() | |
| if len(audio_frames) > 0: | |
| total_audio_frames += len(audio_frames) | |
| # Concatenate all audio frames into a single buffer | |
| audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) | |
| future = self._speech_to_text_actor.process_speech.remote(audio_buffer) | |
| process_speech_to_text_future.append(future) | |
| # audio_frames_task = None | |
| if len(video_frames) > 0: | |
| vector_debug = f"found {len(video_frames)} video frames" | |
| total_video_frames += 1 | |
| skipped_video_frames += (len(video_frames) -1) | |
| image_as_array = video_frames[-1] | |
| image_vector = self._clip_transform.image_to_embeddings(image_as_array) | |
| image_vector = image_vector[0] | |
| distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector) | |
| vector_debug = f"{closest_item_key} {distance_debug_str}" | |
| if len(process_speech_to_text_future) > 0: | |
| ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) | |
| if ready: | |
| prompt, speaker_finished, raw_json = await process_speech_to_text_future[0] | |
| del process_speech_to_text_future[0] | |
| prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"] | |
| if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore: | |
| print(f"Prompt: {prompt}") | |
| line = "" | |
| for i, response in enumerate(current_responses): | |
| line += "π€ " if len(line) == 0 else "" | |
| # line += f"{response} [{speech_chunks_per_response[i]}] \n" | |
| line += f"[{speech_chunks_per_response[i]}] {response} \n" | |
| if len(line) > 0: | |
| await add_debug_output(line) | |
| current_responses = [] | |
| speech_chunks_per_response = [] | |
| env_state.llm_preview = "" | |
| env_state.llm_responses = [] | |
| env_state.tts_raw_chunk_ids = [] | |
| human_preview_text = "" | |
| robot_preview_text = "" | |
| if additional_prompt is not None: | |
| prompt = additional_prompt + ". " + prompt | |
| await add_debug_output(f"π¨ {prompt}") | |
| self._prompt_manager.append_user_message(prompt) | |
| if self._respond_to_prompt_task is not None: | |
| await self._respond_to_prompt.terminate() | |
| self._respond_to_prompt_task.cancel() | |
| self._respond_to_prompt = RespondToPromptAsync(self._environment_state_actor, self._audio_output_queue) | |
| self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages)) | |
| additional_prompt = None | |
| previous_prompt = prompt | |
| is_talking = False | |
| has_spoken_for_this_prompt = False | |
| elif len(prompt) > 0 and prompt not in prompts_to_ignore: | |
| # sometimes we get a false signal of speaker_finsihed | |
| # in which case we get new prompts before we have spoken | |
| if len(previous_prompt) > 0 and not has_spoken_for_this_prompt: | |
| additional_prompt = previous_prompt | |
| has_spoken_for_this_prompt = True | |
| if self._respond_to_prompt_task is not None: | |
| await self._respond_to_prompt.terminate() | |
| self._respond_to_prompt_task.cancel() | |
| self._respond_to_prompt_task = None | |
| self._respond_to_prompt = None | |
| if additional_prompt is not None: | |
| prompt = additional_prompt + ". " + prompt | |
| human_preview_text = f"π¨β {prompt}" | |
| for new_response in env_state.llm_responses: | |
| # add_debug_output(f"π€ {new_response}") | |
| self._prompt_manager.append_assistant_message(new_response) | |
| current_responses.append(new_response) | |
| speech_chunks_per_response.append(0) | |
| robot_preview_text = "" | |
| if len(env_state.llm_preview): | |
| robot_preview_text = f"π€β {env_state.llm_preview}" | |
| for chunk in env_state.tts_raw_chunk_ids: | |
| chunk = json.loads(chunk) | |
| # prompt = chunk['prompt'] | |
| response_id = chunk['llm_sentence_id'] | |
| speech_chunks_per_response[response_id] += 1 | |
| list_of_strings = debug_output_history.copy() | |
| line = "" | |
| for i, response in enumerate(current_responses): | |
| line += "π€ " if len(line) == 0 else "" | |
| line += f"[{speech_chunks_per_response[i]}] {response} \n" | |
| # line += f"{response} [{speech_chunks_per_response[i]}] \n" | |
| if len(robot_preview_text) > 0: | |
| line += robot_preview_text+" \n" | |
| list_of_strings.append(line) | |
| if len(human_preview_text) > 0: | |
| list_of_strings.append(human_preview_text) | |
| if len(list_of_strings) > 10: | |
| list_of_strings.pop(0) | |
| await render_debug_output(list_of_strings) | |
| await asyncio.sleep(0.001) | |
| # add observations to the environment state | |
| count = len(self._audio_output_queue) | |
| is_talking = bool(count > 0) | |
| has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking | |
| frame = self._animator.update(is_talking) | |
| frame_ref = ray.put(frame) | |
| await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref) | |
| loops+=1 | |
| self.set_state( | |
| f"Processed {total_video_frames} video frames \ | |
| and {total_audio_frames} audio frames, \ | |
| loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \ | |
| Is speaking: {is_talking}({count}). \ | |
| {vector_debug}\ | |
| ", skip_print=True) | |
| def init_ray(): | |
| try: | |
| subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"]) | |
| except Exception as e: | |
| print (f"charles_actor.py init_ray: {e}") | |
| # Connect to a running Ray cluster | |
| while not ray.is_initialized(): | |
| time.sleep(0.1) | |
| ray_address = os.getenv('RAY_ADDRESS') | |
| if ray_address: | |
| ray.init(ray_address, namespace="project_charles") | |
| else: | |
| ray.init(namespace="project_charles") | |
| async def main(): | |
| if not ray.is_initialized(): | |
| init_ray() | |
| # charles_actor = CharlesActor.options( | |
| # name="CharlesActor", | |
| # get_if_exists=True, | |
| # ).remote() | |
| # future = charles_actor.start.remote() | |
| charles_actor = CharlesActor() | |
| await charles_actor.start() | |
| last_step = -1 | |
| last_episode = -1 | |
| try: | |
| while True: | |
| ready, _ = ray.wait([future], timeout=0) | |
| if ready: | |
| # The start method has terminated. You can fetch the result (if any) with ray.get(). | |
| # If the method raised an exception, it will be re-raised here. | |
| try: | |
| result = ray.get(future) | |
| print(f"The start method has terminated with result: {result}") | |
| except Exception as e: | |
| print(f"The start method raised an exception: {e}") | |
| break | |
| else: | |
| # The start method is still running. You can poll for debug information here. | |
| await asyncio.sleep(1) | |
| state = await charles_actor.get_state.remote() | |
| env_state = await charles_actor.get_environment_state.remote() | |
| if (env_state.episode != last_episode) or (env_state.step != last_step): | |
| last_episode = env_state.episode | |
| last_step = env_state.step | |
| print(f"Charles is in state: {state}") | |
| # if len(env_state.llm_preview): | |
| # print (f"llm_preview: {env_state.llm_preview}") | |
| # if len(env_state.llm_responses): | |
| # print (f"llm_responses: {env_state.llm_responses}") | |
| # if len(env_state.tts_raw_chunk_ids): | |
| # for chunk_json in env_state.tts_raw_chunk_ids: | |
| # chunk = json.loads(chunk_json) | |
| # prompt = chunk['prompt'] | |
| # line = chunk['llm_sentence_id'] | |
| # chunk_id = chunk['chunk_count'] | |
| # print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}") | |
| except KeyboardInterrupt as e: | |
| print("Script was manually terminated") | |
| raise(e) | |
| if __name__ == "__main__": | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(main()) | |