import os |
import getpass |
from uuid import uuid4 |
import faiss |
import numpy as np |
import requests |
import io |
import warnings |
import torch |
import pickle |
import speech_recognition |
from git import Repo |
from glob import glob |
from rich import print as rp |
from typing import Union, List, Generator, Any, Mapping, Optional,Dict |
from requests.sessions import RequestsCookieJar |
from dotenv import load_dotenv, find_dotenv |
from langchain import hub |
from langchain_core.documents import Document |
from langchain.chains.combine_documents import create_stuff_documents_chain |
from langchain.chains import create_retrieval_chain |
from langchain_community.document_loaders import DirectoryLoader |
from langchain.text_splitter import RecursiveCharacterTextSplitter, Language |
from langchain_huggingface import HuggingFaceEmbeddings |
from langchain_community.vectorstores import Chroma, FAISS |
from langchain.vectorstores.base import VectorStore |
from langchain.retrievers import MultiQueryRetriever |
from langchain.retrievers.self_query.base import SelfQueryRetriever |
from langchain.llms import BaseLLM |
from langchain.retrievers import ContextualCompressionRetriever |
from langchain.retrievers.document_compressors import LLMChainExtractor |
from langchain.retrievers.document_compressors import DocumentCompressorPipeline |
from langchain_community.document_transformers import EmbeddingsRedundantFilter |
from langchain_text_splitters import CharacterTextSplitter |
from langchain.retrievers.document_compressors import EmbeddingsFilter |
import numpy as np |
import pandas as pd |
import plotly.graph_objects as go |
import plotly.express as px |
from plotly.subplots import make_subplots |
import plotly.io as pio |
from sklearn.decomposition import PCA |
from sklearn.preprocessing import MinMaxScaler |
from scipy.stats import gaussian_kde |
from huggingface_hub import InferenceClient |
from hugchat import hugchat |
from hugchat.login import Login |
from hugchat.message import Message |
from hugchat.types.assistant import Assistant |
from hugchat.types.model import Model |
from hugchat.types.message import MessageNode, Conversation |
from sklearn.decomposition import PCA |
from sklearn.preprocessing import MinMaxScaler |
from TTS.api import TTS |
import time |
from playsound import playsound |
from system_prompts import __all__ as prompts |
from profiler import VoiceProfileManager, VoiceProfile |
manager = VoiceProfileManager("my_custom_profiles.json") |
manager.load_profiles() |
new_profile = manager.generate_random_profile() |
rp(f"Generated new profile: {new_profile.name}") |
manager.list_profiles() |
manager.save_profiles() |
load_dotenv(find_dotenv()) |
warnings.filterwarnings("ignore") |
os.environ["USER_AGENT"] = os.getenv("USER_AGENT") |
class ChatBotWrapper: |
def __init__(self, chat_bot): |
self.chat_bot = chat_bot |
def __call__(self, *args, **kwargs): |
return self.chat_bot(*args, **kwargs) |
class UberToolkit: |
def __init__(self, email, password, cookie_path_dir='./cookies/', default_llm=1): |
self.prompts = prompts |
self.email = os.getenv("EMAIL") |
self.password = os.getenv("PASSWD") |
self.default_llm = default_llm |
self.cookie_path_dir = cookie_path_dir |
self.system_prompt = self.prompts['default_rag_prompt'] |
self.cookies = self.login() |
self.bot = hugchat.ChatBot(cookies=self.cookies.get_dict(), default_llm=self.default_llm) |
self.bot_wrapper = ChatBotWrapper(self.bot) |
self.repo_url = '' |
self.conv_id = None |
self.latest_splitter=None |
self.setup_folders() |
self.setup_embeddings() |
self.setup_vector_store() |
self.setup_retrievers() |
self.vector_store = None |
self.compressed_retriever = self.create_high_retrieval_chain() |
self.retriever = self.create_low_retrieval_chain() |
self.setup_tts() |
self.setup_speech_recognition() |
def login(self): |
rp("Attempting to log in...") |
sign = Login(self.email, self.password) |
try: |
cookies = sign.login(cookie_dir_path=self.cookie_path_dir, save_cookies=True) |
rp("Login successful!") |
return cookies |
except Exception as e: |
rp(f"Login failed: {e}") |
rp("Attempting manual login with requests...") |
self.manual_login() |
raise |
def manual_login(self): |
login_url = "https://huggingface.co/login" |
session = requests.Session() |
response = session.get(login_url) |
rp("Response Cookies:", response.cookies) |
rp("Response Content:", response.content.decode()) |
csrf_token = response.cookies.get('csrf_token') |
if not csrf_token: |
rp("CSRF token not found in cookies.") |
return |
login_data = { |
'email': self.email, |
'password': self.password, |
'csrf_token': csrf_token |
} |
response = session.post(login_url, data=login_data) |
if response.ok: |
rp("Manual login successful!") |
else: |
rp("Manual login failed!") |
def setup_embeddings(self): |
self.embeddings = HuggingFaceEmbeddings( |
model_name="all-MiniLM-L6-v2", |
model_kwargs={'device': 'cpu'}, |
encode_kwargs={'normalize_embeddings': True} |
) |
def setup_retrievers(self, k=5, similarity_threshold=0.76): |
self.retriever = self.vector_store.as_retriever(k=k) |
splitter = self.latest_splitter if self.latest_splitter else CharacterTextSplitter(chunk_size=300, chunk_overlap=0, separator=". ") |
redundant_filter = EmbeddingsRedundantFilter(embeddings=self.embeddings) |
relevant_filter = EmbeddingsFilter(embeddings=self.embeddings, similarity_threshold=similarity_threshold) |
pipeline_compressor = DocumentCompressorPipeline( |
transformers=[splitter, redundant_filter, relevant_filter] |
) |
self.compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=self.retriever) |
def create_high_retrieval_chain(self): |
rag_prompt = hub.pull("langchain-ai/retrieval-qa-chat") |
rp(rag_prompt) |
combine_docs_chain = create_stuff_documents_chain(self.bot_wrapper, rag_prompt) |
return create_retrieval_chain(self.compression_retriever, combine_docs_chain) |
def create_low_retrieval_chain(self): |
rag_prompt = hub.pull("langchain-ai/retrieval-qa-chat") |
combine_docs_chain = create_stuff_documents_chain(self.bot_wrapper, rag_prompt) |
return create_retrieval_chain(self.retriever, combine_docs_chain) |
def setup_tts(self, model_name="tts_models/en/ljspeech/fast_pitch"): |
self.tts = TTS(model_name=model_name,progress_bar=False, vocoder_path='vocoder_models/en/ljspeech/univnet') |
def setup_speech_recognition(self): |
self.recognizer = speech_recognition.Recognizer() |
def setup_folders(self): |
self.dirs=["test_input","vectorstore","test"] |
for d in self.dirs: |
os.makedirs(d, exist_ok=True) |
def __call__(self, text): |
if self.conv_id: |
self.bot.change_conversation(self.bot.get_conversation_from_id(self.conv_id)) |
else: |
self.conv_id = self.bot.new_conversation(system_prompt=self.system_prompt, modelIndex=self.default_llm, switch_to=True) |
return self.send_message(text) |
def send_message(self, message, web=False): |
message_result = self.bot.chat(message, web_search=web) |
return message_result.wait_until_done() |
def stream_response(self, message, web=False, stream=False): |
responses = [] |
for resp in self.bot.query(message, stream=stream, web_search=web): |
responses.append(resp['token']) |
return ' '.join(responses) |
def web_search(self, text): |
result = self.send_message(text, web=True) |
return result |
def retrieve_context(self, query: str): |
context=[] |
return context |
try: |
lowres = self.retriever.invoke({'input': query}) |
vector_context = "\n".join(lowres) if lowres else "No Context Available!" |
except Exception as e: |
vector_context = f"Error retrieving context: {str(e)}" |
context.append(vector_context) |
try: |
highres=self.compression_retriever.invoke({'input':query}) |
vector_context = "\n".join(highres) if highres else "No Context Available!" |
except Exception as e: |
vector_context = f"Error retrieving context: {str(e)}" |
context.append(vector_context) |
context = "\n".join([doc.page_content for doc in context]) |
rp(f"CONTEXT:{context}") |
return context |
def delete_all_conversations(self): |
self.bot.delete_all_conversations() |
def delete_conversation(self, conversation_object: Conversation = None): |
self.bot.delete_conversation(conversation_object) |
def get_available_llm_models(self) -> list: |
return self.bot.get_available_llm_models() |
def get_remote_conversations(self, replace_conversation_list=True): |
return self.bot.get_remote_conversations(replace_conversation_list) |
def get_conversation_info(self, conversation: Union[Conversation, str] = None) -> Conversation: |
return self.bot.get_conversation_info(conversation) |
def get_assistant_list_by_page(self, page: int) -> List[Assistant]: |
return self.bot.get_assistant_list_by_page(page) |
def search_assistant(self, assistant_name: str = None, assistant_id: str = None) -> Assistant: |
return self.bot.search_assistant(assistant_name, assistant_id) |
def switch_model(self, index): |
self.conv_id = None |
self.default_llm = index |
def switch_conversation(self, id): |
self.conv_id = id |
def switch_role(self, system_prompt_id): |
self.system_prompt = system_prompt_id |
def chat(self, text: str, web_search: bool = False, _stream_yield_all: bool = False, retry_count: int = 5, conversation: Conversation = None, *args, **kwargs) -> Message: |
return self.bot.chat(text, web_search, _stream_yield_all, retry_count, conversation, *args, **kwargs) |
def get_all_documents(self) -> List[Document]: |
""" |
Retrieve all documents from the vectorstore. |
""" |
if not self.vector_store: |
self.setup_vector_store() |
all_docs_query = "* *" |
all_docs = self.retriever.get_relevant_documents(all_docs_query, k=10000) |
return all_docs |
def generate_3d_scatterplot(self, num_points=1000): |
""" |
Generate a 3D scatter plot of the vector store content. |
:param num_points: Maximum number of points to plot (default: 1000) |
:return: None (displays the plot) |
""" |
import plotly.graph_objects as go |
import numpy as np |
from sklearn.decomposition import PCA |
all_docs = self.get_all_documents() |
if not all_docs: |
raise ValueError("No documents found in the vector store.") |
vectors = [] |
for doc in all_docs: |
if hasattr(doc, 'embedding') and doc.embedding is not None: |
vectors.append(doc.embedding) |
else: |
vectors.append(self.embeddings.embed_query(doc.page_content)) |
vectors = np.array(vectors) |
if len(vectors) > num_points: |
indices = np.random.choice(len(vectors), num_points, replace=False) |
vectors = vectors[indices] |
pca = PCA(n_components=3) |
vectors_3d = pca.fit_transform(vectors) |
fig = go.Figure(data=[go.Scatter3d( |
x=vectors_3d[:, 0], |
y=vectors_3d[:, 1], |
z=vectors_3d[:, 2], |
mode='markers', |
marker=dict( |
size=5, |
color=vectors_3d[:, 2], |
colorscale='Viridis', |
opacity=0.8 |
) |
)]) |
fig.update_layout( |
title='3D Scatter Plot of Vector Store Content', |
scene=dict( |
xaxis_title='PCA Component 1', |
yaxis_title='PCA Component 2', |
zaxis_title='PCA Component 3' |
), |
width=900, |
height=700, |
) |
fig.show() |
print(f"Generated 3D scatter plot with {len(vectors)} points.") |
def listen_for_speech(self): |
with speech_recognition.Microphone() as source: |
rp("Listening...") |
audio = self.recognizer.listen(source) |
try: |
text = self.recognizer.recognize_google(audio) |
rp(f"You said: {text}") |
return text |
except speech_recognition.UnknownValueError: |
rp("Sorry, I couldn't understand that.") |
return None |
except speech_recognition.RequestError as e: |
rp(f"Could not request results from Google Speech Recognition service; {e}") |
return None |
def optimized_tts(self, text: str, output_file: str = "output.wav", speaking_rate: float = 5) -> str: |
start_time = time.time() |
rp(f"Starting TTS at {start_time}") |
try: |
self.tts.tts_to_file( |
text=text, |
file_path=output_file, |
speaker=self.tts.speakers[0] if self.tts.speakers else None, |
language=self.tts.languages[0] if self.tts.languages else None, |
speed=speaking_rate, |
split_sentences=True |
) |
end_time = time.time() |
rp(f"TTS generation took {end_time - start_time:.2f} seconds") |
except RuntimeError as e: |
if "Kernel size can't be greater than actual input" in str(e): |
rp(f"Text too short for TTS: {text}") |
else: |
raise |
return output_file |
@staticmethod |
def play_mp3(file_path): |
playsound(file_path) |
def continuous_voice_chat(self): |
self.input_method = None |
while True: |
rp("Speak your query (or say 'exit' to quit):") |
self.input_method = self.listen_for_speech() |
self.voice_chat_exit = False |
query = self.input_method |
if query is None: |
continue |
""" if 'switch prompt ' in query.lower(): |
q = query.lower() |
new_prompt = q.split("switch prompt ").pop().replace(" ", "_") |
#rp(new_prompt) |
if new_prompt in self.prompts.keys(): |
self.system_prompt = self.prompts[new_prompt] |
rp(f"new system prompt:{self.system_prompt}") |
#self.switch_role(new_prompt_id) |
self.optimized_tts(f"Switched Role to {new_prompt}!") |
self.play_mp3('output.wav') |
continue """ |
if query.lower() == "voice": |
rp("Speak your query (or say 'exit' to quit):") |
self.input_method = self.listen_for_speech() |
continue |
if query.lower() == "type": |
self.input_method = input("Type your question(or type 'exit' to quit): \n") |
continue |
if query.lower() == 'exit': |
rp("Goodbye!") |
self.optimized_tts("Ok, exiting!") |
self.play_mp3('output.wav') |
self.voice_chat_exit = True |
break |
result = self.web_search(query) |
web_context = "\n".join(result) if result else "No Context Available from the websearch!" |
self.system_prompt = self.system_prompt.replace("<<WSCONTEXT>>", web_context) |
response = self.bot.chat(query) |
if "/Store:" in response: |
url = response.split("/Store:").pop().split(" ")[0] |
rp(f"Fetching and storing data from link: {url}") |
try: |
self.add_document_from_url(url) |
except Exception as e: |
rp(f"Error while fetching data from {url}! {e}") |
continue |
if "/Delete:" in response: |
document = response.split("/Delete:").pop().split(" ")[0] |
rp(f"Deleting {document} from vectorstore!") |
try: |
self.delete_document(document) |
except Exception as e: |
rp(f"Error while deleting {document} from vectorstore! {e}") |
rp(f"Chatbot: {response}") |
self.play_mp3(self.optimized_tts(str(response))) |
def initialize_vector_store( |
self, |
initial_docs: Union[List[Union[str, Document]], str], |
embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2", |
persist_directory: str = "faiss_index", |
index_name: str = "document_store" |
) -> FAISS: |
""" |
Initialize a FAISS vector store. If a persistent store exists, load and update it. |
Otherwise, create a new one from the initial documents. |
Args: |
initial_docs (Union[List[Union[str, Document]], str]): Initial documents to add if creating a new store. |
embedding_model_name (str): Name of the HuggingFace embedding model to use. |
persist_directory (str): Directory to save/load the persistent vector store. |
index_name (str): Name of the index file. |
Returns: |
FAISS: The initialized or loaded FAISS vector store. |
""" |
allow_dangerous_deserialization=True |
index_file_path = os.path.join(persist_directory, f"{index_name}.faiss") |
if isinstance(initial_docs, str): |
initial_docs = [Document(page_content=initial_docs)] |
elif isinstance(initial_docs, list): |
initial_docs = [ |
doc if isinstance(doc, Document) else Document(page_content=doc) |
for doc in initial_docs |
] |
if os.path.exists(index_file_path): |
print(f"Loading existing vector store from {index_file_path}") |
vector_store = FAISS.load_local( |
persist_directory, |
self.embeddings, |
index_name, |
allow_dangerous_deserialization=allow_dangerous_deserialization |
) |
if initial_docs: |
print(f"Updating vector store with {len(initial_docs)} new documents") |
vector_store.add_documents(initial_docs) |
vector_store.save_local(persist_directory, index_name) |
else: |
print(f"Creating new vector store with {len(initial_docs)} documents") |
vector_store = FAISS.from_documents(initial_docs, self.embeddings) |
os.makedirs(persist_directory, exist_ok=True) |
vector_store.save_local(persist_directory, index_name) |
return vector_store |
def setup_vector_store(self): |
from langchain.docstore import InMemoryDocstore |
embedding_size = 384 |
index = faiss.IndexFlatL2(embedding_size) |
docstore = InMemoryDocstore({}) |
self.vector_store = FAISS( |
self.embeddings, |
index, |
docstore, |
{} |
) |
""" def setup_vector_store(self): |
self.vector_store = self.initialize_vector_store(['this your Birth, Rise and Shine a mighty bot']) |
""" |
def add_documents_folder(self, folder_path): |
paths=[] |
for root, _, files in os.walk(folder_path): |
for file in files: |
paths.append(os.path.join(root, file)) |
self.add_documents(paths) |
def fetch_document(self, file_path): |
with open(file_path, 'r', encoding='utf-8') as file: |
content = file.read() |
return Document(page_content=content) |
def add_documents(self, documents: List[str]): |
docs_to_add=[] |
if not self.vector_store: |
self.setup_vector_store() |
for document in documents: |
docs_to_add.append(self.fetch_document(document)) |
self.vector_store.add_documents(docs_to_add) |
for i in range(len(docs_to_add)): |
doc_id = self.vector_store.index_to_docstore_id[i] |
rp(f"Added document {i}: {self.vector_store.docstore._dict[doc_id]}") |
def add_document_from_url(self, url): |
if not self.vector_store: |
self.setup_vector_store() |
response = requests.get(url) |
if response.status_code == 200: |
content = response.text |
document = Document(page_content=content) |
self.vector_store.add_documents([document]) |
else: |
rp(f"Failed to fetch URL content: {response.status_code}") |
def delete_document(self, document): |
if document in self.vector_store: |
self.vector_store.delete_document(document) |
rp(f"Deleted document: {document}") |
else: |
rp(f"Document not found: {document}") |
def _add_to_vector_store(self, name, content): |
document = Document(page_content=content) |
self.vector_store.add_documents([document]) |
rp(f"Added document to vector store: {name}") |
self.vectorizer.fit_transform(self.compressed_retriever.invoke("*")) |
def clone_github_repo(self, repo_url, local_path='./repo'): |
if os.path.exists(local_path): |
rp("Repository already cloned.") |
return local_path |
Repo.clone_from(repo_url, local_path) |
return local_path |
def load_documents(self, repo_url, file_types=['*.py', '*.md', '*.txt', '*.html']): |
local_repo_path = self.clone_github_repo(repo_url) |
loader = DirectoryLoader(path=local_repo_path, glob=f"**/{{{','.join(file_types)}}}", show_progress=True, recursive=True) |
loaded=loader.load() |
rp(f"Nr. files loaded: {len(loaded)}") |
return loaded |
def recursive_glob(self,root_dir, patterns): |
import fnmatch |
"""Recursively search for files matching the patterns in root_dir. |
Args: |
root_dir (str): The root directory to start the search from. |
patterns (list): List of file patterns to search for, e.g., ['*.py', '*.md']. |
Returns: |
list: List of paths to the files matching the patterns. |
""" |
matched_files = [] |
for root, dirs, files in os.walk(root_dir): |
for pattern in patterns: |
for filename in fnmatch.filter(files, pattern): |
matched_files.append(os.path.join(root, filename)) |
return matched_files |
def load_documents_from_github(self, repo_url, file_types=['*.py', '*.md', '*.txt', '*.html']): |
local_repo_path = self.clone_github_repo(repo_url) |
document_paths = self.recursive_glob(local_repo_path, file_types) |
rp(f"Found {len(document_paths)} documents") |
self.add_documents(document_paths) |
""" loader = DirectoryLoader(path=local_repo_path, glob=f"**/{{{','.join(file_types)}}}", show_progress=True, recursive=True) |
loaded=loader.load(document_paths) |
rp(f"Nr. files loaded: {len(loaded)}") |
return loaded """ |
def split_documents(self, documents: list,chunk_s=512,chunk_o=0): |
split_docs = [] |
for doc in documents: |
ext = os.path.splitext(getattr(doc, 'source', '') or getattr(doc, 'filename', ''))[1].lower() |
if ext == '.py': |
splitter = RecursiveCharacterTextSplitter.from_language(language=Language.PYTHON, chunk_size=chunk_s, chunk_overlap=chunk_o) |
elif ext in ['.md', '.markdown']: |
splitter = RecursiveCharacterTextSplitter.from_language(language=Language.MARKDOWN, chunk_size=chunk_s, chunk_overlap=chunk_o) |
elif ext in ['.html', '.htm']: |
splitter = RecursiveCharacterTextSplitter.from_language(language=Language.HTML, chunk_size=chunk_s, chunk_overlap=chunk_o) |
else: |
splitter = CharacterTextSplitter(chunk_size=chunk_s, chunk_overlap=chunk_o, add_start_index=True) |
split_docs.extend(splitter.split_documents([doc])) |
return split_docs,splitter |
def save_vectorstore_local(self, folder_path: str="vectorstore", index_name: str = "faiss_index"): |
""" |
Save the FAISS vectorstore locally with all necessary components. |
Args: |
folder_path (str): Folder path to save index, docstore, and index_to_docstore_id to. |
index_name (str): Name for the saved index file (default is "faiss_index"). |
""" |
documents = self.compressed_retriever.invoke("*")<--error |
docstore: Dict[str, Document] = {} |
index_to_docstore_id: Dict[int, str] = {} |
for i, doc in enumerate(documents): |
doc_id = str(uuid4()) |
docstore[doc_id] = doc |
index_to_docstore_id[i] = doc_id |
self.vector_store.save_local(folder_path, index_name) |
import pickle |
with open(os.path.join(folder_path, f"{index_name}_docstore.pkl"), "wb") as f: |
pickle.dump(docstore, f) |
with open(os.path.join(folder_path, f"{index_name}_index_to_docstore_id.pkl"), "wb") as f: |
pickle.dump(index_to_docstore_id, f) |
rp(f"Vectorstore saved successfully to {folder_path}") |
return folder_path |
@classmethod |
def load_vectorstore_local(cls, folder_path: str, index_name: str = "faiss_index", embeddings=None): |
""" |
Load a previously saved FAISS vectorstore. |
Args: |
folder_path (str): Folder path where the index, docstore, and index_to_docstore_id are saved. |
index_name (str): Name of the saved index file (default is "faiss_index"). |
embeddings: The embeddings object to use (must be the same type used when saving). |
Returns: |
FAISS: Loaded FAISS vectorstore |
""" |
allow_dangerous_deserialization = True |
with open(os.path.join(folder_path, f"{index_name}_docstore.pkl"), "rb") as f: |
docstore = pickle.load(f) |
with open(os.path.join(folder_path, f"{index_name}_index_to_docstore_id.pkl"), "rb") as f: |
index_to_docstore_id = pickle.load(f) |
vectorstore = FAISS.load_local( |
folder_path, |
embeddings, |
index_name, |
allow_dangerous_deserialization=allow_dangerous_deserialization |
) |
vectorstore.docstore = docstore |
vectorstore.index_to_docstore_id = index_to_docstore_id |
return vectorstore |
def create_vectorstore_from_github(self): |
documents = self.load_documents_from_github(self.repo_url) |
split_docs,splitter = self.split_documents(documents,512,0) |
self.latest_splitter=splitter |
self.vector_store = FAISS.from_documents(split_docs, self.embeddings) |
self.vector_store.save_local() |
rp(f"Vectorstore created with {len(split_docs)} documents.") |
def update_vectorstore(self, new_documents): |
split_docs,splitter = self.split_documents(new_documents) |
self.latest_splitter=splitter |
self.vector_store.add_documents(split_docs) |
rp(f"Vectorstore updated with {len(split_docs)} new documents.") |
def retrieve_with_chain(self, query, mode='high'): |
if mode == 'high': |
return self.compressed_retriever.invoke({"input": query}) |
else: |
return self.retriever.invoke({"input": query}) |
def generate_code(self, prompt): |
self.system_prompt=self.prompts["code_generator_prompt"] |
return self.send_message(prompt) |
def debug_script(self, script): |
self.system_prompt = self.prompts["script_debugger_prompt"] |
return self.send_message(f"Debug the following script:\n\n{script}") |
def test_software(self, software_description): |
self.system_prompt = self.prompts["software_tester_prompt"] |
return self.send_message(f"Create a test plan for the following software:\n\n{software_description}") |
def parse_todo(self, todo_list): |
self.system_prompt = self.prompts["todo_parser_prompt"] |
return self.send_message(f"Parse and organize the following TODO list:\n\n{todo_list}") |
def tell_story(self, prompt): |
self.system_prompt = self.prompts["story_teller_prompt"] |
return self.stream_response(f"Tell a story based on this prompt:\n\n{prompt}") |
def act_as_copilot(self, task): |
self.system_prompt = self.prompts["copilot_prompt"] |
return self.send_message(f"Assist me as a copilot for the following task:\n\n{task}") |
def control_iterations(self, task, max_iterations=5): |
self.system_prompt = self.prompts["iteration_controller_prompt"] |
iteration = 0 |
result = "" |
while iteration < max_iterations: |
response = self.send_message(f"Iteration {iteration + 1} for task:\n\n{task}\n\nCurrent result:\n{result}") |
result += f"\nIteration {iteration + 1}:\n{response}" |
if "TASK_COMPLETE" in response: |
break |
iteration += 1 |
return result |
def voice_command_mode(self): |
rp("Entering voice command mode. Speak your commands.") |
while True: |
command = self.listen_for_speech() |
if command is None: |
continue |
if command.lower() == "exit voice mode": |
rp("Exiting voice command mode.") |
break |
response = self.process_voice_command(command) |
rp(f"Assistant: {response}") |
self.optimized_tts(response) |
self.play_mp3('output.wav') |
def process_voice_command(self, command): |
if "generate code" in command.lower(): |
return self.generate_code(command) |
elif "debug script" in command.lower(): |
return self.debug_script(command) |
elif "test software" in command.lower(): |
return self.test_software(command) |
elif "parse todo" in command.lower(): |
return self.parse_todo(command) |
elif "tell story" in command.lower(): |
return self.tell_story(command) |
elif "act as copilot" in command.lower(): |
return self.act_as_copilot(command) |
else: |
return self.send_message(command) |
def interactive_mode(self): |
rp("Entering interactive mode. Type 'exit' to quit, 'voice' for voice input, or 'command' for specific functions.") |
while True: |
user_input = input("You: ") |
if user_input.lower() == 'exit': |
rp("Exiting interactive mode.") |
break |
elif user_input.lower() == 'voice': |
self.voice_command_mode() |
elif user_input.lower() == 'command': |
self.command_mode() |
else: |
response = self.send_message(user_input) |
rp(f"Assistant: {response}") |
def command_mode(self): |
rp("Entering command mode. Available commands: generate_code, debug_script, test_software, parse_todo, tell_story, copilot, iterate") |
while True: |
command = input("Enter command (or 'exit' to return to interactive mode): ") |
if command.lower() == 'exit': |
rp("Exiting command mode.") |
break |
self.execute_command(command) |
def execute_command(self, command): |
if command == "add_to_vectorstore": |
prompt = input("Enter list of files, folders, urls or repos with knowledge to add:") |
response = self.generate_code(prompt) |
if command == "generate_code": |
file_name = input("Enter script filename:") |
prompt = input("Enter code generation prompt:") |
response = self.generate_code(prompt) |
elif command == "debug_script": |
script = input("Enter script to debug:") |
response = self.debug_script(script) |
elif command == "test_script": |
description = input("Enter path to script:") |
response = self.test_software(description) |
elif command == "parse_todo": |
todo_list = input("Enter TODO list:") |
response = self.parse_todo(todo_list) |
elif command == "tell_story": |
prompt = input("Enter story prompt:") |
response = self.tell_story(prompt) |
elif command == "copilot": |
task = input("Enter task for copilot:") |
response = self.act_as_copilot(task) |
elif command == "iterate": |
task = input("Enter task for iteration:") |
max_iterations = int(input("Enter maximum number of iterations: ")) |
response = self.control_iterations(task, max_iterations) |
else: |
response = "Unknown command. Please try again." |
rp(f"Assistant: {response}") |
def run(self): |
rp("Welcome to the Advanced AI Toolkit!") |
rp("Choose a mode to start:") |
rp("1. Interactive Chat") |
rp("2. Voice Chat") |
rp("3. Command Mode") |
choice = input("Enter your choice (1/2/3): ") |
if choice == '1': |
self.interactive_mode() |
elif choice == '2': |
self.continuous_voice_chat() |
elif choice == '3': |
self.command_mode() |
else: |
rp("Invalid choice. Exiting.") |
if __name__ == "__main__": |
email = os.getenv("EMAIL") |
password = os.getenv("PASSWD") |
toolkit = UberToolkit(email, password) |
toolkit.run() |