|
|
|
|
|
|
|
import os |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, pipeline |
|
from transformers import LlamaTokenizer |
|
import streamlit as st |
|
import torch |
|
|
|
|
|
REPO_NAME = 'schuler/experimental-JP47D20' |
|
|
|
|
|
|
|
st.set_page_config(page_title="Experimental KPhi3 Model - Currently in Training", page_icon="π€") |
|
st.title("Experimental KPhi3 Model - Currently in Training") |
|
|
|
|
|
@st.cache_resource(show_spinner="Loading model...") |
|
def load_model(local_repo_name): |
|
|
|
tokenizer = LlamaTokenizer.from_pretrained(local_repo_name, trust_remote_code=True) |
|
generator_conf = GenerationConfig.from_pretrained(local_repo_name) |
|
model = AutoModelForCausalLM.from_pretrained(local_repo_name, trust_remote_code=True, torch_dtype=torch.bfloat16) |
|
return tokenizer, generator_conf, model |
|
|
|
tokenizer, generator_conf, model = load_model(REPO_NAME) |
|
|
|
total_params = sum(p.numel() for p in model.parameters()) |
|
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) |
|
embed_params = sum(p.numel() for p in model.model.embed_tokens.parameters())*2 |
|
non_embed_params = (trainable_params - embed_params) / 1e6 |
|
|
|
st.markdown(f"*This chat uses the {REPO_NAME} model with {model.get_memory_footprint() / 1e6:.2f} MB memory footprint. ") |
|
|
|
|
|
|
|
|
|
|
|
st.markdown(f"Total number of non embedding trainable parameters: {non_embed_params:.2f} million. ") |
|
st.markdown(f"You may ask questions such as 'What is biology?' or 'What is the human body?'*") |
|
|
|
try: |
|
generator = pipeline("text-generation", model=model, tokenizer=tokenizer) |
|
except Exception as e: |
|
st.error(f"Failed to load model: {str(e)}") |
|
|
|
|
|
if "avatars" not in st.session_state: |
|
st.session_state.avatars = {'user': None, 'assistant': None} |
|
|
|
|
|
if 'user_text' not in st.session_state: |
|
st.session_state.user_text = None |
|
|
|
|
|
if "max_response_length" not in st.session_state: |
|
st.session_state.max_response_length = 64 |
|
|
|
if "system_message" not in st.session_state: |
|
st.session_state.system_message = "" |
|
|
|
if "starter_message" not in st.session_state: |
|
st.session_state.starter_message = "Hello, there! How can I help you today?" |
|
|
|
if "can_continue" not in st.session_state: |
|
st.session_state.can_continue = False |
|
|
|
|
|
need_continue = False |
|
|
|
|
|
if "last_response" not in st.session_state: |
|
st.session_state.last_response = '' |
|
|
|
|
|
with st.sidebar: |
|
st.header("System Settings") |
|
|
|
|
|
st.session_state.system_message = st.text_area( |
|
"System Message", value=st.session_state.system_message |
|
) |
|
st.session_state.starter_message = st.text_area( |
|
'First AI Message', value=st.session_state.starter_message |
|
) |
|
|
|
|
|
st.session_state.max_response_length = st.number_input( |
|
"Max Response Length", value=st.session_state.max_response_length |
|
) |
|
|
|
|
|
st.markdown("*Select Avatars:*") |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.session_state.avatars['assistant'] = st.selectbox( |
|
"AI Avatar", options=["π€", "π¬", "π€"], index=0 |
|
) |
|
with col2: |
|
st.session_state.avatars['user'] = st.selectbox( |
|
"User Avatar", options=["π€", "π±ββοΈ", "π¨πΎ", "π©", "π§πΎ"], index=0 |
|
) |
|
|
|
reset_history = st.button("Reset Chat History") |
|
|
|
|
|
if "chat_history" not in st.session_state or reset_history: |
|
st.session_state.chat_history = [] |
|
|
|
def get_response(system_message, chat_history, user_text, max_new_tokens=256, continue_last=False): |
|
""" |
|
Generates a response from the chatbot model. |
|
|
|
Args: |
|
system_message (str): The system message for the conversation. |
|
chat_history (list): The list of previous chat messages. |
|
user_text (str): The user's input text. |
|
max_new_tokens (int): The maximum number of new tokens to generate. |
|
continue_last (bool): Whether to continue the last assistant response. |
|
|
|
Returns: |
|
tuple: A tuple containing the generated response and the updated chat history. |
|
""" |
|
if continue_last: |
|
|
|
prompt = st.session_state.last_response |
|
else: |
|
|
|
if (len(system_message)>0): |
|
prompt = "<|assistant|>"+system_message+f"<|end|>" |
|
else: |
|
prompt = '' |
|
|
|
for message in chat_history: |
|
role = "<|assistant|>" if message['role'] == 'assistant' else "<|user|>" |
|
prompt += f"{role}{message['content']}<|end|>" |
|
prompt += f"<|user|>{user_text}<|end|><|assistant|>" |
|
|
|
|
|
response_output = generator( |
|
prompt, |
|
generation_config=generator_conf, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=True, |
|
top_p=0.25, |
|
repetition_penalty=1.2 |
|
) |
|
|
|
generated_text = response_output[0]['generated_text'] |
|
|
|
st.session_state.last_response = generated_text |
|
|
|
|
|
assistant_response = generated_text[len(prompt):] |
|
|
|
if continue_last: |
|
|
|
st.session_state.chat_history[-1]['content'] += assistant_response |
|
else: |
|
|
|
chat_history.append({'role': 'user', 'content': user_text}) |
|
chat_history.append({'role': 'assistant', 'content': assistant_response}) |
|
|
|
return assistant_response, chat_history |
|
|
|
|
|
chat_interface = st.container() |
|
def refresh_chat(): |
|
with chat_interface: |
|
output_container = st.container() |
|
|
|
|
|
with output_container: |
|
for idx, message in enumerate(st.session_state.chat_history): |
|
if message['role'] == 'system': |
|
continue |
|
with st.chat_message(message['role'], avatar=st.session_state.avatars[message['role']]): |
|
st.markdown(message['content']) |
|
|
|
|
|
|
|
|
|
refresh_chat() |
|
|
|
|
|
st.session_state.user_text = st.chat_input(placeholder="Enter your text here.") |
|
|
|
|
|
if st.session_state.user_text: |
|
|
|
with st.chat_message("user", avatar=st.session_state.avatars['user']): |
|
st.markdown(st.session_state.user_text) |
|
|
|
|
|
with st.chat_message("assistant", avatar=st.session_state.avatars['assistant']): |
|
with st.spinner("Thinking..."): |
|
|
|
response, st.session_state.chat_history = get_response( |
|
system_message=st.session_state.system_message, |
|
user_text=st.session_state.user_text, |
|
chat_history=st.session_state.chat_history, |
|
max_new_tokens=st.session_state.max_response_length, |
|
continue_last=False |
|
) |
|
st.markdown(response) |
|
st.session_state.can_continue = True |
|
|
|
|
|
st.session_state.user_text = None |
|
|
|
if st.session_state.can_continue: |
|
if st.button("Continue"): |
|
need_continue = True |
|
else: |
|
need_continue = False |
|
|
|
|
|
if need_continue: |
|
|
|
with st.chat_message("assistant", avatar=st.session_state.avatars['assistant']): |
|
with st.spinner("Continuing..."): |
|
|
|
response, st.session_state.chat_history = get_response( |
|
system_message=st.session_state.system_message, |
|
user_text=None, |
|
chat_history=st.session_state.chat_history, |
|
max_new_tokens=st.session_state.max_response_length, |
|
continue_last=True |
|
) |
|
st.markdown(response) |
|
st.rerun() |