Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| import asyncio | |
| import itertools | |
| import json | |
| import os | |
| import torch | |
| import openai | |
| class ChatService: | |
| def __init__(self, api="openai", model_id = "gpt-3.5-turbo"): | |
| self._api = api | |
| self._device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| self._model_id = model_id | |
| def _should_we_send_to_voice(self, sentence): | |
| sentence_termination_characters = [".", "?", "!"] | |
| close_brackets = ['"', ')', ']'] | |
| temination_charicter_present = any(c in sentence for c in sentence_termination_characters) | |
| # early exit if we don't have a termination character | |
| if not temination_charicter_present: | |
| return None | |
| # early exit the last char is a termination character | |
| if sentence[-1] in sentence_termination_characters: | |
| return None | |
| # early exit the last char is a close bracket | |
| if sentence[-1] in close_brackets: | |
| return None | |
| termination_indices = [sentence.rfind(char) for char in sentence_termination_characters] | |
| # Filter out termination indices that are not followed by whitespace or end of string | |
| termination_indices = [i for i in termination_indices if sentence[i+1].isspace()] | |
| last_termination_index = max(termination_indices) | |
| # handle case of close bracket | |
| while last_termination_index+1 < len(sentence) and sentence[last_termination_index+1] in close_brackets: | |
| last_termination_index += 1 | |
| text_to_speak = sentence[:last_termination_index+1] | |
| return text_to_speak | |
| def ignore_sentence(self, text_to_speak): | |
| # exit if empty, white space or an single breaket | |
| if text_to_speak.isspace(): | |
| return True | |
| # exit if not letters or numbers | |
| has_letters = any(char.isalpha() for char in text_to_speak) | |
| has_numbers = any(char.isdigit() for char in text_to_speak) | |
| if not has_letters and not has_numbers: | |
| return True | |
| return False | |
| async def get_responses_as_sentances_async(self, messages, cancel_event=None): | |
| llm_response = "" | |
| current_sentence = "" | |
| delay = 0.1 | |
| while True: | |
| try: | |
| response = await openai.ChatCompletion.acreate( | |
| model=self._model_id, | |
| messages=messages, | |
| temperature=1.0, # use 0 for debugging/more deterministic results | |
| stream=True | |
| ) | |
| async for chunk in response: | |
| if cancel_event is not None and cancel_event.is_set(): | |
| return | |
| chunk_message = chunk['choices'][0]['delta'] | |
| if 'content' in chunk_message: | |
| chunk_text = chunk_message['content'] | |
| current_sentence += chunk_text | |
| llm_response += chunk_text | |
| text_to_speak = self._should_we_send_to_voice(current_sentence) | |
| if text_to_speak: | |
| current_sentence = current_sentence[len(text_to_speak):] | |
| yield text_to_speak, True | |
| else: | |
| yield current_sentence, False | |
| if cancel_event is not None and cancel_event.is_set(): | |
| return | |
| if len(current_sentence) > 0: | |
| yield current_sentence, True | |
| return | |
| except openai.error.APIError as e: | |
| print(f"OpenAI API returned an API Error: {e}") | |
| print(f"Retrying in {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| delay *= 2 | |
| except openai.error.APIConnectionError as e: | |
| print(f"Failed to connect to OpenAI API: {e}") | |
| print(f"Retrying in {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| delay *= 2 | |
| except openai.error.RateLimitError as e: | |
| print(f"OpenAI API request exceeded rate limit: {e}") | |
| print(f"Retrying in {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| delay *= 2 | |
| except Exception as e: | |
| print(f"OpenAI API unknown error: {e}") | |
| print(f"Retrying in {delay} seconds...") | |
| await asyncio.sleep(delay) | |
| delay *= 2 | 
