Spaces:
Sleeping
Sleeping
""" | |
This script defines the Naomi class, which utilizes the Llama model for chatbot interactions. | |
It includes methods for responding to user input while maintaining a chat history. | |
Keyword arguments: | |
- kwargs: Additional keyword arguments for candidate information. | |
Return: | |
- An instance of the Naomi class, capable of handling chatbot interactions. | |
""" | |
import time | |
from uuid import uuid4 | |
from llama_cpp import Llama | |
from llama_cpp.llama_tokenizer import LlamaHFTokenizer | |
# default decoding params initiation | |
SEED = 42 | |
MODEL_CARD = "bartowski/Meta-Llama-3.1-8B-Instruct-GGUF" | |
MODEL_PATH = "Meta-Llama-3.1-8B-Instruct-Q3_K_XL.gguf" | |
base_model_id = "meta-llama/Llama-3.1-8B-Instruct" | |
new_chat_template = """{{- bos_token }} | |
{%- if custom_tools is defined %} | |
{%- set tools = custom_tools %} | |
{%- endif %} | |
{%- if not tools_in_user_message is defined %} | |
{%- set tools_in_user_message = true %} | |
{%- endif %} | |
{%- if not date_string is defined %} | |
{%- set date_string = "26 Jul 2024" %} | |
{%- endif %} | |
{%- if not tools is defined %} | |
{%- set tools = none %} | |
{%- endif %} | |
{#- This block extracts the system message, so we can slot it into the right place. #} | |
{%- if messages[0]['role'] == 'system' %} | |
{%- set system_message = messages[0]['content']|trim %} | |
{%- set messages = messages[1:] %} | |
{#- System message + builtin tools #} | |
{{- "<|start_header_id|>system<|end_header_id|>\n\n" }} | |
{%- if builtin_tools is defined or tools is not none %} | |
{{- "Environment: ipython\n" }} | |
{%- endif %} | |
{%- if builtin_tools is defined %} | |
{{- "Tools: " + builtin_tools | reject('equalto', 'code_interpreter') | join(", ") + "\n\n"}} | |
{%- endif %} | |
{%- if tools is not none and not tools_in_user_message %} | |
{{- "You have access to the following functions. To call a function, please respond with JSON for a function call." }} | |
{{- 'Respond in the format {"name": function name, "parameters": dictionary of argument name and its value}.' }} | |
{{- "Do not use variables.\n\n" }} | |
{%- for t in tools %} | |
{{- t | tojson(indent=4) }} | |
{{- "\n\n" }} | |
{%- endfor %} | |
{%- endif %} | |
{{- system_message }} | |
{{- "<|eot_id|>" }} | |
{%- else %} | |
{%- set system_message = "" %} | |
{%- endif %} | |
{%- for message in messages %} | |
{%- if not (message.role == 'ipython' or message.role == 'tool' or 'tool_calls' in message) %} | |
{{- '<|start_header_id|>' + message['role'] + '<|end_header_id|>\n\n'+ message['content'] | trim + '<|eot_id|>' }} | |
{%- elif 'tool_calls' in message %} | |
{%- if not message.tool_calls|length == 1 %} | |
{{- raise_exception("This model only supports single tool-calls at once!") }} | |
{%- endif %} | |
{%- set tool_call = message.tool_calls[0].function %} | |
{%- if builtin_tools is defined and tool_call.name in builtin_tools %} | |
{{- '<|start_header_id|>assistant<|end_header_id|>\n\n' -}} | |
{{- "<|python_tag|>" + tool_call.name + ".call(" }} | |
{%- for arg_name, arg_val in tool_call.arguments | items %} | |
{{- arg_name + '="' + arg_val + '"' }} | |
{%- if not loop.last %} | |
{{- ", " }} | |
{%- endif %} | |
{%- endfor %} | |
{{- ")" }} | |
{%- else %} | |
{{- '<|start_header_id|>assistant<|end_header_id|>\n\n' -}} | |
{{- '{"name": "' + tool_call.name + '", ' }} | |
{{- '"parameters": ' }} | |
{{- tool_call.arguments | tojson }} | |
{{- "}" }} | |
{%- endif %} | |
{%- if builtin_tools is defined %} | |
{#- This means we're in ipython mode #} | |
{{- "<|eom_id|>" }} | |
{%- else %} | |
{{- "<|eot_id|>" }} | |
{%- endif %} | |
{%- elif message.role == "tool" or message.role == "ipython" %} | |
{{- "<|start_header_id|>ipython<|end_header_id|>\n\n" }} | |
{%- if message.content is mapping or message.content is iterable %} | |
{{- message.content | tojson }} | |
{%- else %} | |
{{- message.content }} | |
{%- endif %} | |
{{- "<|eot_id|>" }} | |
{%- endif %} | |
{%- endfor %} | |
{%- if add_generation_prompt %} | |
{{- '<|start_header_id|>assistant<|end_header_id|>\n\n' }} | |
{%- endif %}""" | |
datetime_format = '%Y-%m-%d %H:%M:%S' | |
from datetime import datetime | |
class Naomi: | |
def __init__(self, **kwargs): | |
self.session_id = uuid4().hex | |
self.candidate = kwargs | |
# load the model | |
self.model = Llama.from_pretrained( | |
repo_id=MODEL_CARD, | |
filename=MODEL_PATH, | |
tokenizer=LlamaHFTokenizer.from_pretrained(base_model_id) | |
) | |
self.model.tokenizer_.hf_tokenizer.chat_template = new_chat_template | |
# load the agents prompts | |
self.chat_history = self.model.tokenizer_.hf_tokenizer.apply_chat_template( | |
) | |
self.timestamps = [] | |
def invoke(self, history, **kwargs): | |
""" Invoked during stream. """ | |
# user msg handling | |
self.timestamps += [datetime.now().strftime(datetime_format)] | |
format_user_input = self.model.tokenizer_.hf_tokenizer.apply_chat_template(history[-1], tokenize=False, add_generation_prompt=False) | |
self.chat_history += format_user_input | |
# agent msg results + clean | |
response = self.model(self.chat_history, **kwargs) | |
output = "".join(response['choices'][0]['text'].split('\n\n')[1:]) | |
# update history | |
self.timestamps += [datetime.now().strftime(datetime_format)] | |
self.chat_history += self.model.tokenizer_.hf_tokenizer.apply_chat_template([{'role': 'assistant', 'content': output}], tokenize=False, add_generation_prompt=False) | |
return output | |
def respond(self, history, **kwargs): | |
""" Generator that yields responses in chat sessions. """ | |
response = self.invoke(history, **kwargs) | |
for word in response.split(): | |
yield word + " " | |
time.sleep(0.05) |