|
import re
|
|
from pprint import pprint
|
|
|
|
from transformers import AutoTokenizer
|
|
|
|
from constants.models import AVAILABLE_MODELS, MODEL_MAP
|
|
from tclogger import logger
|
|
|
|
|
|
class MessageComposer:
|
|
def __init__(self, model: str = None):
|
|
if model in AVAILABLE_MODELS:
|
|
self.model = model
|
|
else:
|
|
self.model = "nous-mixtral-8x7b"
|
|
self.model_fullname = MODEL_MAP[self.model]
|
|
self.system_roles = ["system"]
|
|
self.inst_roles = ["user", "system", "inst"]
|
|
self.answer_roles = ["assistant", "bot", "answer", "model"]
|
|
self.default_role = "user"
|
|
|
|
def concat_messages_by_role(self, messages):
|
|
def is_same_role(role1, role2):
|
|
if (
|
|
(role1 == role2)
|
|
or (role1 in self.inst_roles and role2 in self.inst_roles)
|
|
or (role1 in self.answer_roles and role2 in self.answer_roles)
|
|
):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
concat_messages = []
|
|
for message in messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if concat_messages and is_same_role(role, concat_messages[-1]["role"]):
|
|
concat_messages[-1]["content"] += "\n" + content
|
|
else:
|
|
if role in self.inst_roles:
|
|
message["role"] = "inst"
|
|
elif role in self.answer_roles:
|
|
message["role"] = "answer"
|
|
else:
|
|
message["role"] = "inst"
|
|
concat_messages.append(message)
|
|
return concat_messages
|
|
|
|
def merge(self, messages) -> str:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.messages = messages
|
|
self.merged_str = ""
|
|
|
|
|
|
if self.model in ["mixtral-8x7b", "mistral-7b"]:
|
|
self.messages = self.concat_messages_by_role(messages)
|
|
self.cached_str = ""
|
|
for message in self.messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role in self.inst_roles:
|
|
self.cached_str = f"[INST] {content} [/INST]"
|
|
elif role in self.answer_roles:
|
|
self.merged_str += f"<s> {self.cached_str} {content} </s>\n"
|
|
self.cached_str = ""
|
|
else:
|
|
self.cached_str = f"[INST] {content} [/INST]"
|
|
if self.cached_str:
|
|
self.merged_str += f"{self.cached_str}"
|
|
|
|
elif self.model in ["nous-mixtral-8x7b"]:
|
|
self.merged_str_list = []
|
|
for message in self.messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role not in ["system", "user", "assistant"]:
|
|
role = self.default_role
|
|
message_line = f"<|im_start|>{role}\n{content}<|im_end|>"
|
|
self.merged_str_list.append(message_line)
|
|
self.merged_str_list.append("<|im_start|>assistant")
|
|
self.merged_str = "\n".join(self.merged_str_list)
|
|
|
|
elif self.model in ["openchat-3.5"]:
|
|
self.messages = self.concat_messages_by_role(messages)
|
|
self.merged_str_list = []
|
|
self.end_of_turn = "<|end_of_turn|>"
|
|
for message in self.messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role in self.inst_roles:
|
|
self.merged_str_list.append(
|
|
f"GPT4 Correct User:\n{content}{self.end_of_turn}"
|
|
)
|
|
elif role in self.answer_roles:
|
|
self.merged_str_list.append(
|
|
f"GPT4 Correct Assistant:\n{content}{self.end_of_turn}"
|
|
)
|
|
else:
|
|
self.merged_str_list.append(
|
|
f"GPT4 Correct User: {content}{self.end_of_turn}"
|
|
)
|
|
self.merged_str_list.append(f"GPT4 Correct Assistant:\n")
|
|
self.merged_str = "\n".join(self.merged_str_list)
|
|
|
|
elif self.model in ["gemma-7b"]:
|
|
self.messages = self.concat_messages_by_role(messages)
|
|
self.merged_str_list = []
|
|
self.end_of_turn = "<end_of_turn>"
|
|
self.start_of_turn = "<start_of_turn>"
|
|
for message in self.messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role in self.inst_roles:
|
|
self.merged_str_list.append(
|
|
f"{self.start_of_turn}user\n{content}{self.end_of_turn}"
|
|
)
|
|
elif role in self.answer_roles:
|
|
self.merged_str_list.append(
|
|
f"{self.start_of_turn}model\n{content}{self.end_of_turn}"
|
|
)
|
|
else:
|
|
self.merged_str_list.append(
|
|
f"{self.start_of_turn}user\n{content}{self.end_of_turn}"
|
|
)
|
|
self.merged_str_list.append(f"{self.start_of_turn}model\n")
|
|
self.merged_str = "<bos>" + "\n".join(self.merged_str_list)
|
|
|
|
|
|
|
|
elif self.model in ["openchat-3.5", "command-r-plus", "gemma-7b", "yi-1.5-34b"]:
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(
|
|
self.model_fullname, use_fast=False
|
|
)
|
|
self.merged_str = tokenizer.apply_chat_template(
|
|
messages, tokenize=False, add_generation_prompt=True
|
|
)
|
|
else:
|
|
self.merged_str = "\n\n".join(
|
|
[f"{message['role']}: {message['content']}" for message in messages]
|
|
)
|
|
|
|
return self.merged_str
|
|
|
|
def decompose_to_system_and_input_prompt(
|
|
self, messages: list[dict], append_assistant=True
|
|
):
|
|
system_prompt_list = []
|
|
user_and_assistant_messages = []
|
|
for message in messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role in self.system_roles:
|
|
system_prompt_list.append(content)
|
|
else:
|
|
user_and_assistant_messages.append(message)
|
|
system_prompt = "\n".join(system_prompt_list)
|
|
|
|
input_prompt_list = []
|
|
input_messages = self.concat_messages_by_role(user_and_assistant_messages)
|
|
for message in input_messages:
|
|
role = message["role"]
|
|
content = message["content"]
|
|
if role in self.answer_roles:
|
|
role_content_str = f"`assistant`:\n{content}"
|
|
else:
|
|
role_content_str = f"`user`:\n{content}"
|
|
input_prompt_list.append(role_content_str)
|
|
input_prompt = "\n\n".join(input_prompt_list)
|
|
|
|
if append_assistant:
|
|
input_prompt += "\n\n`assistant`:"
|
|
|
|
return system_prompt, input_prompt
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
model = "gemma-7b"
|
|
|
|
|
|
composer = MessageComposer(model)
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": "You are a LLM developed by OpenAI.\nYour name is GPT-4.",
|
|
},
|
|
{"role": "user", "content": "Hello, who are you?"},
|
|
{"role": "assistant", "content": "I am a bot."},
|
|
{"role": "user", "content": "What is your name?"},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
system_prompt, input_prompt = composer.decompose_to_system_and_input_prompt(
|
|
messages
|
|
)
|
|
logger.note("system_prompt:")
|
|
logger.mesg(system_prompt)
|
|
logger.note("input_prompt:")
|
|
logger.mesg(input_prompt)
|
|
|
|
|
|
|