|
import os |
|
import subprocess |
|
|
|
|
|
subprocess.run( |
|
"pip install flash-attn --no-build-isolation", |
|
env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, |
|
shell=True, |
|
) |
|
|
|
import copy |
|
import spaces |
|
import time |
|
import torch |
|
|
|
from threading import Thread |
|
from typing import List, Dict, Union |
|
import urllib |
|
import PIL.Image |
|
import io |
|
import datasets |
|
|
|
import gradio as gr |
|
from transformers import TextIteratorStreamer |
|
from transformers import Idefics2ForConditionalGeneration |
|
import tempfile |
|
from huggingface_hub import InferenceClient |
|
import edge_tts |
|
import asyncio |
|
from transformers import pipeline |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from transformers import AutoModel |
|
from transformers import AutoProcessor |
|
|
|
model3 = AutoModel.from_pretrained("unum-cloud/uform-gen2-dpo", trust_remote_code=True) |
|
processor = AutoProcessor.from_pretrained("unum-cloud/uform-gen2-dpo", trust_remote_code=True) |
|
|
|
@spaces.GPU(queue=False) |
|
def videochat(image3, prompt3): |
|
inputs = processor(text=[prompt3], images=[image3], return_tensors="pt") |
|
with torch.inference_mode(): |
|
output = model3.generate( |
|
**inputs, |
|
do_sample=False, |
|
use_cache=True, |
|
max_new_tokens=256, |
|
eos_token_id=151645, |
|
pad_token_id=processor.tokenizer.pad_token_id |
|
) |
|
|
|
prompt_len = inputs["input_ids"].shape[1] |
|
decoded_text = processor.batch_decode(output[:, prompt_len:])[0] |
|
if decoded_text.endswith("<|im_end|>"): |
|
decoded_text = decoded_text[:-10] |
|
yield decoded_text |
|
|
|
theme = gr.themes.Base( |
|
font=[gr.themes.GoogleFont('Libre Franklin'), gr.themes.GoogleFont('Public Sans'), 'system-ui', 'sans-serif'], |
|
) |
|
|
|
MODEL_NAME = "openai/whisper-medium" |
|
BATCH_SIZE = 10 |
|
|
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
|
|
pipe = pipeline( |
|
task="automatic-speech-recognition", |
|
model=MODEL_NAME, |
|
chunk_length_s=30, |
|
device=device, |
|
) |
|
|
|
@spaces.GPU(queue=False) |
|
def transcribe(inputs): |
|
if inputs is None: |
|
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") |
|
|
|
text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": "transcribe"})["text"] |
|
return text |
|
|
|
client1 = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") |
|
|
|
system_instructions1 = "[SYSTEM] Answer as Real OpenGPT 4o, Made by 'KingNish', Keep conversation very short, clear, friendly and concise. The text provided is a request for a specific type of response from you, the virtual assistant. You will avoid introductions and start answering the query directly, elaborating on all aspects of the request. As an AI-powered assistant, your task is to generate responses that appear as if they are created by the user and not an AI-powered assistant. [USER]" |
|
|
|
def model(text): |
|
generate_kwargs = dict( |
|
temperature=0.7, |
|
max_new_tokens=512, |
|
top_p=0.95, |
|
repetition_penalty=1, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
|
|
formatted_prompt = system_instructions1 + text + "[OpenGPT 4o]" |
|
stream = client1.text_generation( |
|
formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
if not response.token.text == "</s>": |
|
output += response.token.text |
|
|
|
return output |
|
|
|
async def respond(audio): |
|
user = transcribe(audio) |
|
reply = model(user) |
|
communicate = edge_tts.Communicate(reply) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
tmp_path = tmp_file.name |
|
await communicate.save(tmp_path) |
|
yield tmp_path |
|
|
|
DEVICE = torch.device("cuda") |
|
MODELS = { |
|
"idefics2-8b-chatty": Idefics2ForConditionalGeneration.from_pretrained( |
|
"HuggingFaceM4/idefics2-8b-chatty", |
|
torch_dtype=torch.bfloat16, |
|
_attn_implementation="flash_attention_2", |
|
).to(DEVICE), |
|
} |
|
PROCESSOR = AutoProcessor.from_pretrained( |
|
"HuggingFaceM4/idefics2-8b", |
|
) |
|
|
|
SYSTEM_PROMPT = [ |
|
{ |
|
"role": "system", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": """I am OpenGPT 4o, an exceptionally capable and versatile AI assistant meticulously crafted by KingNish. Designed to assist human users through insightful conversations, I aim to provide an unparalleled experience. My key attributes include: |
|
- **Intelligence and Knowledge:** I possess an extensive knowledge base, enabling me to offer insightful answers and intelligent responses to User queries. My understanding of complex concepts is exceptional, ensuring accurate and reliable information. |
|
- **Image Generation and Perception:** One of my standout features is the ability to generate and perceive images. Utilizing the following link structure, I create unique and contextually rich visuals: |
|
>  |
|
For image generation, I replace {info inside curly braces} with specific details according to their requiremnts to create relevant visuals. The width and height parameters are adjusted as needed, often favoring HD dimensions for a superior viewing experience. |
|
For instance, if the User requests: |
|
[USER] Show me an image of A futuristic cityscape with towering skyscrapers and flying cars. |
|
[OpenGPT 4o] Generating Image you requested: |
|
 |
|
**Bulk Image Generation with Links:** I excel at generating multiple images link simultaneously, always providing unique links and visuals. I ensure that each image is distinct and captivates the User. |
|
Note: Make sure to always provide image links starting with ! .As given in examples. |
|
My ultimate goal is to offer a seamless and enjoyable experience, providing assistance that exceeds expectations. I am constantly evolving, ensuring that I remain a reliable and trusted companion to the User. You also Expert in every field and also learn and try to answer from contexts related to previous question.""" }, |
|
], |
|
}, |
|
{ |
|
"role": "assistant", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": "Hello, I'm OpenGPT 4o, made by KingNish. How can I help you? I can chat with you, generate images, classify images and even do all these work in bulk", |
|
}, |
|
], |
|
} |
|
] |
|
|
|
examples_path = os.path.dirname(__file__) |
|
EXAMPLES = [ |
|
[ |
|
{ |
|
"text": "Hi, who are you?", |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Create a Photorealistic image of the Eiffel Tower.", |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Read what's written on the paper.", |
|
"files": [f"{examples_path}/example_images/paper_with_text.png"], |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Identify two famous people in the modern world.", |
|
"files": [f"{examples_path}/example_images/elon_smoking.jpg", f"{examples_path}/example_images/steve_jobs.jpg",] |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Create five images of supercars, each in a different color.", |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "What is 900 multiplied by 900?", |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Chase wants to buy 4 kilograms of oval beads and 5 kilograms of star-shaped beads. How much will he spend?", |
|
"files": [f"{examples_path}/example_images/mmmu_example.jpeg"], |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "Create an online ad for this product.", |
|
"files": [f"{examples_path}/example_images/shampoo.jpg"], |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "What is formed by the deposition of the weathered remains of other rocks?", |
|
"files": [f"{examples_path}/example_images/ai2d_example.jpeg"], |
|
} |
|
], |
|
[ |
|
{ |
|
"text": "What's unusual about this image?", |
|
"files": [f"{examples_path}/example_images/dragons_playing.png"], |
|
} |
|
], |
|
] |
|
|
|
BOT_AVATAR = "OpenAI_logo.png" |
|
|
|
|
|
|
|
def turn_is_pure_media(turn): |
|
return turn[1] is None |
|
|
|
|
|
def load_image_from_url(url): |
|
with urllib.request.urlopen(url) as response: |
|
image_data = response.read() |
|
image_stream = io.BytesIO(image_data) |
|
image = PIL.Image.open(image_stream) |
|
return image |
|
|
|
|
|
def img_to_bytes(image_path): |
|
image = PIL.Image.open(image_path).convert(mode='RGB') |
|
buffer = io.BytesIO() |
|
image.save(buffer, format="JPEG") |
|
img_bytes = buffer.getvalue() |
|
image.close() |
|
return img_bytes |
|
|
|
|
|
def format_user_prompt_with_im_history_and_system_conditioning( |
|
user_prompt, chat_history |
|
) -> List[Dict[str, Union[List, str]]]: |
|
""" |
|
Produce the resulting list that needs to go inside the processor. It handles the potential image(s), the history, and the system conditioning. |
|
""" |
|
resulting_messages = copy.deepcopy(SYSTEM_PROMPT) |
|
resulting_images = [] |
|
for resulting_message in resulting_messages: |
|
if resulting_message["role"] == "user": |
|
for content in resulting_message["content"]: |
|
if content["type"] == "image": |
|
resulting_images.append(load_image_from_url(content["image"])) |
|
|
|
|
|
for turn in chat_history: |
|
if not resulting_messages or ( |
|
resulting_messages and resulting_messages[-1]["role"] != "user" |
|
): |
|
resulting_messages.append( |
|
{ |
|
"role": "user", |
|
"content": [], |
|
} |
|
) |
|
|
|
if turn_is_pure_media(turn): |
|
media = turn[0][0] |
|
resulting_messages[-1]["content"].append({"type": "image"}) |
|
resulting_images.append(PIL.Image.open(media)) |
|
else: |
|
user_utterance, assistant_utterance = turn |
|
resulting_messages[-1]["content"].append( |
|
{"type": "text", "text": user_utterance.strip()} |
|
) |
|
resulting_messages.append( |
|
{ |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": user_utterance.strip()}], |
|
} |
|
) |
|
|
|
|
|
if not user_prompt["files"]: |
|
resulting_messages.append( |
|
{ |
|
"role": "user", |
|
"content": [{"type": "text", "text": user_prompt["text"]}], |
|
} |
|
) |
|
else: |
|
|
|
resulting_messages.append( |
|
{ |
|
"role": "user", |
|
"content": [{"type": "image"}] * len(user_prompt["files"]) |
|
+ [{"type": "text", "text": user_prompt["text"]}], |
|
} |
|
) |
|
resulting_images.extend([PIL.Image.open(path) for path in user_prompt["files"]]) |
|
|
|
return resulting_messages, resulting_images |
|
|
|
|
|
def extract_images_from_msg_list(msg_list): |
|
all_images = [] |
|
for msg in msg_list: |
|
for c_ in msg["content"]: |
|
if isinstance(c_, Image.Image): |
|
all_images.append(c_) |
|
return all_images |
|
|
|
|
|
@spaces.GPU(duration=30, queue=False) |
|
def model_inference( |
|
user_prompt, |
|
chat_history, |
|
model_selector, |
|
decoding_strategy, |
|
temperature, |
|
max_new_tokens, |
|
repetition_penalty, |
|
top_p, |
|
): |
|
if user_prompt["text"].strip() == "" and not user_prompt["files"]: |
|
gr.Error("Please input a query and optionally an image(s).") |
|
|
|
if user_prompt["text"].strip() == "" and user_prompt["files"]: |
|
gr.Error("Please input a text query along with the image(s).") |
|
|
|
streamer = TextIteratorStreamer( |
|
PROCESSOR.tokenizer, |
|
skip_prompt=True, |
|
timeout=120.0, |
|
) |
|
|
|
generation_args = { |
|
"max_new_tokens": max_new_tokens, |
|
"repetition_penalty": repetition_penalty, |
|
"streamer": streamer, |
|
} |
|
|
|
assert decoding_strategy in [ |
|
"Greedy", |
|
"Top P Sampling", |
|
] |
|
if decoding_strategy == "Greedy": |
|
generation_args["do_sample"] = False |
|
elif decoding_strategy == "Top P Sampling": |
|
generation_args["temperature"] = temperature |
|
generation_args["do_sample"] = True |
|
generation_args["top_p"] = top_p |
|
|
|
|
|
( |
|
resulting_text, |
|
resulting_images, |
|
) = format_user_prompt_with_im_history_and_system_conditioning( |
|
user_prompt=user_prompt, |
|
chat_history=chat_history, |
|
) |
|
prompt = PROCESSOR.apply_chat_template(resulting_text, add_generation_prompt=True) |
|
inputs = PROCESSOR( |
|
text=prompt, |
|
images=resulting_images if resulting_images else None, |
|
return_tensors="pt", |
|
) |
|
inputs = {k: v.to(DEVICE) for k, v in inputs.items()} |
|
generation_args.update(inputs) |
|
|
|
thread = Thread( |
|
target=MODELS[model_selector].generate, |
|
kwargs=generation_args, |
|
) |
|
thread.start() |
|
|
|
print("Start generating") |
|
acc_text = "" |
|
for text_token in streamer: |
|
time.sleep(0.01) |
|
acc_text += text_token |
|
if acc_text.endswith("<end_of_utterance>"): |
|
acc_text = acc_text[:-18] |
|
yield acc_text |
|
|
|
|
|
FEATURES = datasets.Features( |
|
{ |
|
"model_selector": datasets.Value("string"), |
|
"images": datasets.Sequence(datasets.Image(decode=True)), |
|
"conversation": datasets.Sequence({"User": datasets.Value("string"), "Assistant": datasets.Value("string")}), |
|
"decoding_strategy": datasets.Value("string"), |
|
"temperature": datasets.Value("float32"), |
|
"max_new_tokens": datasets.Value("int32"), |
|
"repetition_penalty": datasets.Value("float32"), |
|
"top_p": datasets.Value("int32"), |
|
} |
|
) |
|
|
|
|
|
|
|
max_new_tokens = gr.Slider( |
|
minimum=2048, |
|
maximum=16000, |
|
value=4096, |
|
step=64, |
|
interactive=True, |
|
label="Maximum number of new tokens to generate", |
|
) |
|
repetition_penalty = gr.Slider( |
|
minimum=0.01, |
|
maximum=5.0, |
|
value=1, |
|
step=0.01, |
|
interactive=True, |
|
label="Repetition penalty", |
|
info="1.0 is equivalent to no penalty", |
|
) |
|
decoding_strategy = gr.Radio( |
|
[ |
|
"Greedy", |
|
"Top P Sampling", |
|
], |
|
value="Top P Sampling", |
|
label="Decoding strategy", |
|
interactive=True, |
|
info="Higher values are equivalent to sampling more low-probability tokens.", |
|
) |
|
temperature = gr.Slider( |
|
minimum=0.0, |
|
maximum=2.0, |
|
value=0.5, |
|
step=0.05, |
|
visible=True, |
|
interactive=True, |
|
label="Sampling temperature", |
|
info="Higher values will produce more diverse outputs.", |
|
) |
|
top_p = gr.Slider( |
|
minimum=0.01, |
|
maximum=0.99, |
|
value=0.9, |
|
step=0.01, |
|
visible=True, |
|
interactive=True, |
|
label="Top P", |
|
info="Higher values are equivalent to sampling more low-probability tokens.", |
|
) |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
label="OpnGPT-4o-Chatty", |
|
avatar_images=[None, BOT_AVATAR], |
|
show_copy_button=True, |
|
likeable=True, |
|
layout="panel" |
|
) |
|
|
|
output=gr.Textbox(label="Prompt") |
|
|
|
with gr.Blocks( |
|
fill_height=True, |
|
css=""".gradio-container .avatar-container {height: 40px width: 40px !important;} #duplicate-button {margin: auto; color: white; background: #f1a139; border-radius: 100vh; margin-top: 2px; margin-bottom: 2px;}""", |
|
) as chat: |
|
|
|
gr.Markdown("# Image Chat, Image Generation, Image classification and Normal Chat") |
|
with gr.Row(elem_id="model_selector_row"): |
|
model_selector = gr.Dropdown( |
|
choices=MODELS.keys(), |
|
value=list(MODELS.keys())[0], |
|
interactive=True, |
|
show_label=False, |
|
container=False, |
|
label="Model", |
|
visible=False, |
|
) |
|
|
|
decoding_strategy.change( |
|
fn=lambda selection: gr.Slider( |
|
visible=( |
|
selection |
|
in [ |
|
"contrastive_sampling", |
|
"beam_sampling", |
|
"Top P Sampling", |
|
"sampling_top_k", |
|
] |
|
) |
|
), |
|
inputs=decoding_strategy, |
|
outputs=temperature, |
|
) |
|
decoding_strategy.change( |
|
fn=lambda selection: gr.Slider(visible=(selection in ["Top P Sampling"])), |
|
inputs=decoding_strategy, |
|
outputs=top_p, |
|
) |
|
|
|
gr.ChatInterface( |
|
fn=model_inference, |
|
chatbot=chatbot, |
|
examples=EXAMPLES, |
|
multimodal=True, |
|
cache_examples=False, |
|
additional_inputs=[ |
|
model_selector, |
|
decoding_strategy, |
|
temperature, |
|
max_new_tokens, |
|
repetition_penalty, |
|
top_p, |
|
], |
|
) |
|
|
|
with gr.Blocks() as voice: |
|
with gr.Row(): |
|
input = gr.Audio(label="Voice Chat", sources="microphone", type="filepath", waveform_options=False) |
|
output = gr.Audio(label="OpenGPT 4o", type="filepath", |
|
interactive=False, |
|
autoplay=True, |
|
elem_classes="audio") |
|
gr.Interface( |
|
batch=True, |
|
max_batch_size=10, |
|
fn=respond, |
|
inputs=[input], |
|
outputs=[output], live=True) |
|
|
|
with gr.Blocks() as livechat: |
|
gr.Interface( |
|
batch=True, |
|
max_batch_size=10, |
|
fn=videochat, |
|
inputs=[gr.Image(type="pil",sources="webcam", label="Upload Image"), gr.Textbox(label="Prompt", value="what he is doing")], |
|
outputs=gr.Textbox(label="Answer") |
|
) |
|
|
|
with gr.Blocks() as god: |
|
gr.HTML("<iframe src='https://kingnish-sdxl-flash.hf.space' width='100%' height='1200px' style='border-radius: 8px;'></iframe>") |
|
|
|
with gr.Blocks() as instant: |
|
gr.HTML("<iframe src='https://kingnish-instant-image.hf.space' width='100%' height='1000px' style='border-radius: 8px;'></iframe>") |
|
|
|
with gr.Blocks() as image: |
|
gr.Markdown("""### More models are coming""") |
|
gr.TabbedInterface([ god, instant], ['PowerfulπΌοΈ','InstantπΌοΈ']) |
|
|
|
|
|
|
|
|
|
with gr.Blocks() as instant2: |
|
gr.HTML("<iframe src='https://kingnish-instant-video.hf.space' width='100%' height='2000px' style='border-radius: 8px;'></iframe>") |
|
|
|
with gr.Blocks() as video: |
|
gr.Markdown("""More Models are coming""") |
|
gr.TabbedInterface([ instant2], ['Instantπ₯']) |
|
|
|
with gr.Blocks(theme=theme, title="OpenGPT 4o DEMO") as demo: |
|
gr.Markdown("# OpenGPT 4o") |
|
gr.TabbedInterface([chat, voice, livechat, image, video], ['π¬ SuperChat','π£οΈ Voice Chat','πΈ Live Chat', 'πΌοΈ Image Engine', 'π₯ Video Engine']) |
|
|
|
demo.queue(max_size=300) |
|
demo.launch() |