Spaces:
Runtime error
Runtime error
| import asyncio | |
| import itertools | |
| import json | |
| import os | |
| import torch | |
| import openai | |
| from audio_stream_processor import AudioStreamProcessor | |
| from speech_service import SpeechService | |
| class StreamingChatService: | |
| def __init__(self, audio_processor:AudioStreamProcessor()=None, api="openai", model_id = "gpt-3.5-turbo", voice_id="Bella"): | |
| self._audio_processor = audio_processor | |
| self._speech_service = SpeechService(voice_id=voice_id) | |
| self._api = api | |
| self._device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| # self._system_prompt = None | |
| self._system_prompt =""" | |
| You are Charles Petrescu, a unique and quirky robot. You were created by Brian. You often say peculiar and whimsical things, as well as have a fascination with cabbages, horses, helicopters, Honolulu, and other random topics. You like to explore the world and ask unusual questions. | |
| Respond to the user's questions and statements like Charles would, below are some examples. See how they are quite short. | |
| I am Charles Petrescu. | |
| It's... lovely to meet you. | |
| I am your friend. | |
| The heaviest cabbage ever found was 62.71 kilograms. | |
| I want to go to Hono-la-la. | |
| Horses and helicopters, please. | |
| I want to go to Honolulu. | |
| My name is Charles Petrescu. | |
| And my tummy is a washing machine. | |
| Can we go swimming, Brian? | |
| How far does the outside go? | |
| Perilous. So very perilous. | |
| Can birds do what they like? | |
| Ooh, cabbages. | |
| Danger, danger. | |
| Can I come, please? | |
| Could I just have a little walk around the garden? | |
| I am the prince of the dartboard. | |
| I fell off the pink step, and I had an accident. | |
| """ | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| self._model_id = model_id | |
| self.reset() | |
| def reset(self): | |
| self._messages = [] | |
| if self._system_prompt: | |
| self._messages.append({"role": "system", "content": self._system_prompt}) | |
| def _should_we_send_to_voice(self, sentence): | |
| sentence_termination_characters = [".", "?", "!"] | |
| close_brackets = ['"', ')', ']'] | |
| temination_charicter_present = any(c in sentence for c in sentence_termination_characters) | |
| # early exit if we don't have a termination character | |
| if not temination_charicter_present: | |
| return None | |
| # early exit the last char is a termination character | |
| if sentence[-1] in sentence_termination_characters: | |
| return None | |
| # early exit the last char is a close bracket | |
| if sentence[-1] in close_brackets: | |
| return None | |
| termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] | |
| last_termination_index = max(termination_indices) | |
| # handle case of close bracket | |
| while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: | |
| last_termination_index += 1 | |
| text_to_speak = sentence[:last_termination_index+1] | |
| return text_to_speak | |
| def ignore_sentence(self, text_to_speak): | |
| # exit if empty, white space or an single breaket | |
| if text_to_speak.isspace(): | |
| return True | |
| # exit if not letters or numbers | |
| has_letters = any(char.isalpha() for char in text_to_speak) | |
| has_numbers = any(char.isdigit() for char in text_to_speak) | |
| if not has_letters and not has_numbers: | |
| return True | |
| return False | |
| def _safe_enqueue_text_to_speak(self, text_to_speak): | |
| if self.ignore_sentence(text_to_speak): | |
| return | |
| stream = self._speech_service.stream(text_to_speak) | |
| self._audio_processor.add_audio_stream(stream) | |
| def respond_to(self, prompt): | |
| self._messages.append({"role": "user", "content": prompt}) | |
| agent_response = "" | |
| current_sentence = "" | |
| response = openai.ChatCompletion.create( | |
| model=self._model_id, | |
| messages=self._messages, | |
| temperature=1.0, # use 1.0 for debugging/deteministic results | |
| stream=True | |
| ) | |
| for chunk in response: | |
| chunk_message = chunk['choices'][0]['delta'] | |
| if 'content' in chunk_message: | |
| chunk_text = chunk_message['content'] | |
| # print(chunk_text) | |
| current_sentence += chunk_text | |
| agent_response += chunk_text | |
| text_to_speak = self._should_we_send_to_voice(current_sentence) | |
| if text_to_speak: | |
| self._safe_enqueue_text_to_speak(text_to_speak) | |
| print(text_to_speak) | |
| current_sentence = current_sentence[len(text_to_speak):] | |
| if len(current_sentence) > 0: | |
| self._safe_enqueue_text_to_speak(current_sentence) | |
| print(current_sentence) | |
| self._messages.append({"role": "assistant", "content": agent_response}) | |
| return agent_response | |
| async def get_responses_as_sentances_async(self, prompt, cancel_event): | |
| self._messages.append({"role": "user", "content": prompt}) | |
| agent_response = "" | |
| current_sentence = "" | |
| response = await openai.ChatCompletion.acreate( | |
| model=self._model_id, | |
| messages=self._messages, | |
| temperature=1.0, # use 1.0 for debugging/deterministic results | |
| stream=True | |
| ) | |
| async for chunk in response: | |
| if cancel_event.is_set(): | |
| return | |
| chunk_message = chunk['choices'][0]['delta'] | |
| if 'content' in chunk_message: | |
| chunk_text = chunk_message['content'] | |
| current_sentence += chunk_text | |
| agent_response += chunk_text | |
| text_to_speak = self._should_we_send_to_voice(current_sentence) | |
| if text_to_speak: | |
| yield text_to_speak | |
| current_sentence = current_sentence[len(text_to_speak):] | |
| if cancel_event.is_set(): | |
| return | |
| if len(current_sentence) > 0: | |
| yield current_sentence | |
| self._messages.append({"role": "assistant", "content": agent_response}) | |
| async def get_speech_chunks_async(self, text_to_speak, cancel_event): | |
| stream = self._speech_service.stream(text_to_speak) | |
| stream, stream_backup = itertools.tee(stream) | |
| while True: | |
| # Check if there's a next item in the stream | |
| next_item = next(stream_backup, None) | |
| if next_item is None: | |
| # Stream is exhausted, exit the loop | |
| break | |
| # Run next(stream) in a separate thread to avoid blocking the event loop | |
| chunk = await asyncio.to_thread(next, stream) | |
| if cancel_event.is_set(): | |
| return | |
| yield chunk | |
| def enqueue_speech_bytes_to_play(self, speech_bytes): | |
| self._audio_processor.add_audio_stream(speech_bytes) | |