Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # encoding: utf-8 | |
| import torch | |
| torch.randn(10).cuda() | |
| import argparse | |
| from transformers import AutoModel, AutoTokenizer | |
| import gradio as gr | |
| from PIL import Image | |
| from decord import VideoReader, cpu | |
| import io | |
| import os | |
| import copy | |
| import requests | |
| import base64 | |
| import json | |
| import traceback | |
| import re | |
| import modelscope_studio as mgr | |
| from modelscope.hub.snapshot_download import snapshot_download | |
| model_dir = snapshot_download('iic/mPLUG-Owl3-7B-240728', cache_dir='./') | |
| os.system('ls') | |
| # README, How to run demo on different devices | |
| # For Nvidia GPUs. | |
| # python web_demo_2.6.py --device cuda | |
| # For Mac with MPS (Apple silicon or AMD GPUs). | |
| # PYTORCH_ENABLE_MPS_FALLBACK=1 python web_demo_2.6.py --device mps | |
| # Argparser | |
| parser = argparse.ArgumentParser(description='demo') | |
| parser.add_argument('--device', type=str, default='cuda', help='cuda or mps') | |
| parser.add_argument("--host", type=str, default="0.0.0.0") | |
| parser.add_argument("--port", type=int) | |
| args = parser.parse_args() | |
| device = args.device | |
| assert device in ['cuda', 'mps'] | |
| # Load model | |
| model_path = './iic/mPLUG-Owl3-7B-240728' | |
| if 'int4' in model_path: | |
| if device == 'mps': | |
| print('Error: running int4 model with bitsandbytes on Mac is not supported right now.') | |
| exit() | |
| model = AutoModel.from_pretrained(model_path, attn_implementation='sdpa', trust_remote_code=True) | |
| else: | |
| model = AutoModel.from_pretrained(model_path, attn_implementation='sdpa', trust_remote_code=True, torch_dtype=torch.bfloat16) | |
| model = model.to(device=device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | |
| model.eval() | |
| ERROR_MSG = "Error, please retry" | |
| model_name = 'mPLUG-Owl3' | |
| MAX_NUM_FRAMES = 64 | |
| IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} | |
| VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'} | |
| def get_file_extension(filename): | |
| return os.path.splitext(filename)[1].lower() | |
| def is_image(filename): | |
| return get_file_extension(filename) in IMAGE_EXTENSIONS | |
| def is_video(filename): | |
| return get_file_extension(filename) in VIDEO_EXTENSIONS | |
| form_radio = { | |
| 'choices': ['Beam Search', 'Sampling'], | |
| #'value': 'Beam Search', | |
| 'value': 'Sampling', | |
| 'interactive': True, | |
| 'label': 'Decode Type' | |
| } | |
| def create_component(params, comp='Slider'): | |
| if comp == 'Slider': | |
| return gr.Slider( | |
| minimum=params['minimum'], | |
| maximum=params['maximum'], | |
| value=params['value'], | |
| step=params['step'], | |
| interactive=params['interactive'], | |
| label=params['label'] | |
| ) | |
| elif comp == 'Radio': | |
| return gr.Radio( | |
| choices=params['choices'], | |
| value=params['value'], | |
| interactive=params['interactive'], | |
| label=params['label'] | |
| ) | |
| elif comp == 'Button': | |
| return gr.Button( | |
| value=params['value'], | |
| interactive=True | |
| ) | |
| def create_multimodal_input(upload_image_disabled=False, upload_video_disabled=False): | |
| return mgr.MultimodalInput(upload_image_button_props={'label': 'Upload Image', 'disabled': upload_image_disabled, 'file_count': 'multiple'}, | |
| upload_video_button_props={'label': 'Upload Video', 'disabled': upload_video_disabled, 'file_count': 'single'}, | |
| submit_button_props={'label': 'Submit'}) | |
| def chat(img, msgs, ctx, params=None, vision_hidden_states=None): | |
| try: | |
| print('msgs:', msgs) | |
| images = [] | |
| videos = [] | |
| messages = [] | |
| for line in msgs: | |
| s = "" | |
| for item in line['content']: | |
| if isinstance(item, str): | |
| s+=item | |
| else: | |
| s+='<|image|>' | |
| images.append(item) | |
| messages.append({"role": line['role'], "content": s}) | |
| messages.append({"role": "assistant", "content": ""}) | |
| answer = model.chat( | |
| images=images, | |
| videos=videos, | |
| messages=messages, | |
| tokenizer=tokenizer, | |
| **params | |
| ) | |
| res = re.sub(r'(<box>.*</box>)', '', answer) | |
| res = res.replace('<ref>', '') | |
| res = res.replace('</ref>', '') | |
| res = res.replace('<box>', '') | |
| answer = res.replace('</box>', '') | |
| print('answer:', answer) | |
| return 0, answer, None, None | |
| except Exception as e: | |
| print(e) | |
| traceback.print_exc() | |
| return -1, ERROR_MSG, None, None | |
| def encode_image(image): | |
| if not isinstance(image, Image.Image): | |
| if hasattr(image, 'path'): | |
| image = Image.open(image.path).convert("RGB") | |
| else: | |
| image = Image.open(image.file.path).convert("RGB") | |
| # resize to max_size | |
| max_size = 448*16 | |
| if max(image.size) > max_size: | |
| w,h = image.size | |
| if w > h: | |
| new_w = max_size | |
| new_h = int(h * max_size / w) | |
| else: | |
| new_h = max_size | |
| new_w = int(w * max_size / h) | |
| image = image.resize((new_w, new_h), resample=Image.BICUBIC) | |
| return image | |
| ## save by BytesIO and convert to base64 | |
| #buffered = io.BytesIO() | |
| #image.save(buffered, format="png") | |
| #im_b64 = base64.b64encode(buffered.getvalue()).decode() | |
| #return {"type": "image", "pairs": im_b64} | |
| def encode_video(video): | |
| def uniform_sample(l, n): | |
| gap = len(l) / n | |
| idxs = [int(i * gap + gap / 2) for i in range(n)] | |
| return [l[i] for i in idxs] | |
| if hasattr(video, 'path'): | |
| vr = VideoReader(video.path, ctx=cpu(0)) | |
| else: | |
| vr = VideoReader(video.file.path, ctx=cpu(0)) | |
| sample_fps = round(vr.get_avg_fps() / 1) # FPS | |
| frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
| if len(frame_idx)>MAX_NUM_FRAMES: | |
| frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES) | |
| video = vr.get_batch(frame_idx).asnumpy() | |
| video = [Image.fromarray(v.astype('uint8')) for v in video] | |
| video = [encode_image(v) for v in video] | |
| print('video frames:', len(video)) | |
| return video | |
| def check_mm_type(mm_file): | |
| if hasattr(mm_file, 'path'): | |
| path = mm_file.path | |
| else: | |
| path = mm_file.file.path | |
| if is_image(path): | |
| return "image" | |
| if is_video(path): | |
| return "video" | |
| return None | |
| def encode_mm_file(mm_file): | |
| if check_mm_type(mm_file) == 'image': | |
| return [encode_image(mm_file)] | |
| if check_mm_type(mm_file) == 'video': | |
| return encode_video(mm_file) | |
| return None | |
| def make_text(text): | |
| #return {"type": "text", "pairs": text} # # For remote call | |
| return text | |
| def encode_message(_question): | |
| files = _question.files | |
| question = _question.text | |
| pattern = r"\[mm_media\]\d+\[/mm_media\]" | |
| matches = re.split(pattern, question) | |
| message = [] | |
| if len(matches) != len(files) + 1: | |
| gr.Warning("Number of Images not match the placeholder in text, please refresh the page to restart!") | |
| assert len(matches) == len(files) + 1 | |
| text = matches[0].strip() | |
| if text: | |
| message.append(make_text(text)) | |
| for i in range(len(files)): | |
| message += encode_mm_file(files[i]) | |
| text = matches[i + 1].strip() | |
| if text: | |
| message.append(make_text(text)) | |
| return message | |
| def check_has_videos(_question): | |
| images_cnt = 0 | |
| videos_cnt = 0 | |
| for file in _question.files: | |
| if check_mm_type(file) == "image": | |
| images_cnt += 1 | |
| else: | |
| videos_cnt += 1 | |
| return images_cnt, videos_cnt | |
| def count_video_frames(_context): | |
| num_frames = 0 | |
| for message in _context: | |
| for item in message["content"]: | |
| #if item["type"] == "image": # For remote call | |
| if isinstance(item, Image.Image): | |
| num_frames += 1 | |
| return num_frames | |
| def respond(_question, _chat_bot, _app_cfg, params_form): | |
| _context = _app_cfg['ctx'].copy() | |
| _context.append({'role': 'user', 'content': encode_message(_question)}) | |
| images_cnt = _app_cfg['images_cnt'] | |
| videos_cnt = _app_cfg['videos_cnt'] | |
| files_cnts = check_has_videos(_question) | |
| if files_cnts[1] + videos_cnt > 1 or (files_cnts[1] + videos_cnt == 1 and files_cnts[0] + images_cnt > 0): | |
| gr.Warning("Only supports single video file input right now!") | |
| return _question, _chat_bot, _app_cfg | |
| if params_form == 'Beam Search': | |
| params = { | |
| 'sampling': False, | |
| 'num_beams': 3, | |
| 'repetition_penalty': 1.2, | |
| "max_new_tokens": 2048 | |
| } | |
| else: | |
| params = { | |
| 'sampling': True, | |
| 'top_p': 0.8, | |
| 'top_k': 100, | |
| 'temperature': 0.7, | |
| 'repetition_penalty': 1.05, | |
| "max_new_tokens": 2048 | |
| } | |
| if files_cnts[1] + videos_cnt > 0: | |
| params["max_inp_length"] = 4352 # 4096+256 | |
| params["use_image_id"] = False | |
| params["max_slice_nums"] = 1 if count_video_frames(_context) > 16 else 2 | |
| code, _answer, _, sts = chat("", _context, None, params) | |
| images_cnt += files_cnts[0] | |
| videos_cnt += files_cnts[1] | |
| _context.append({"role": "assistant", "content": [make_text(_answer)]}) | |
| _chat_bot.append((_question, _answer)) | |
| if code == 0: | |
| _app_cfg['ctx']=_context | |
| _app_cfg['sts']=sts | |
| _app_cfg['images_cnt'] = images_cnt | |
| _app_cfg['videos_cnt'] = videos_cnt | |
| upload_image_disabled = videos_cnt > 0 | |
| upload_video_disabled = videos_cnt > 0 or images_cnt > 0 | |
| return create_multimodal_input(upload_image_disabled, upload_video_disabled), _chat_bot, _app_cfg | |
| def fewshot_add_demonstration(_image, _user_message, _assistant_message, _chat_bot, _app_cfg): | |
| ctx = _app_cfg["ctx"] | |
| message_item = [] | |
| if _image is not None: | |
| image = Image.open(_image).convert("RGB") | |
| ctx.append({"role": "user", "content": [encode_image(image), make_text(_user_message)]}) | |
| message_item.append({"text": "[mm_media]1[/mm_media]" + _user_message, "files": [_image]}) | |
| else: | |
| if _user_message: | |
| ctx.append({"role": "user", "content": [make_text(_user_message)]}) | |
| message_item.append({"text": _user_message, "files": []}) | |
| else: | |
| message_item.append(None) | |
| if _assistant_message: | |
| ctx.append({"role": "assistant", "content": [make_text(_assistant_message)]}) | |
| message_item.append({"text": _assistant_message, "files": []}) | |
| else: | |
| message_item.append(None) | |
| _chat_bot.append(message_item) | |
| return None, "", "", _chat_bot, _app_cfg | |
| def fewshot_respond(_image, _user_message, _chat_bot, _app_cfg, params_form): | |
| user_message_contents = [] | |
| _context = _app_cfg["ctx"].copy() | |
| if _image: | |
| image = Image.open(_image).convert("RGB") | |
| user_message_contents += [encode_image(image)] | |
| if _user_message: | |
| user_message_contents += [make_text(_user_message)] | |
| if user_message_contents: | |
| _context.append({"role": "user", "content": user_message_contents}) | |
| if params_form == 'Beam Search': | |
| params = { | |
| 'sampling': False, | |
| 'num_beams': 3, | |
| 'repetition_penalty': 1.2, | |
| "max_new_tokens": 2048 | |
| } | |
| else: | |
| params = { | |
| 'sampling': True, | |
| 'top_p': 0.8, | |
| 'top_k': 100, | |
| 'temperature': 0.7, | |
| 'repetition_penalty': 1.05, | |
| "max_new_tokens": 2048 | |
| } | |
| code, _answer, _, sts = chat("", _context, None, params) | |
| _context.append({"role": "assistant", "content": [make_text(_answer)]}) | |
| if _image: | |
| _chat_bot.append([ | |
| {"text": "[mm_media]1[/mm_media]" + _user_message, "files": [_image]}, | |
| {"text": _answer, "files": []} | |
| ]) | |
| else: | |
| _chat_bot.append([ | |
| {"text": _user_message, "files": [_image]}, | |
| {"text": _answer, "files": []} | |
| ]) | |
| if code == 0: | |
| _app_cfg['ctx']=_context | |
| _app_cfg['sts']=sts | |
| return None, '', '', _chat_bot, _app_cfg | |
| def regenerate_button_clicked(_question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg, params_form): | |
| if len(_chat_bot) <= 1 or not _chat_bot[-1][1]: | |
| gr.Warning('No question for regeneration.') | |
| return '', _image, _user_message, _assistant_message, _chat_bot, _app_cfg | |
| if _app_cfg["chat_type"] == "Chat": | |
| images_cnt = _app_cfg['images_cnt'] | |
| videos_cnt = _app_cfg['videos_cnt'] | |
| _question = _chat_bot[-1][0] | |
| _chat_bot = _chat_bot[:-1] | |
| _app_cfg['ctx'] = _app_cfg['ctx'][:-2] | |
| files_cnts = check_has_videos(_question) | |
| images_cnt -= files_cnts[0] | |
| videos_cnt -= files_cnts[1] | |
| _app_cfg['images_cnt'] = images_cnt | |
| _app_cfg['videos_cnt'] = videos_cnt | |
| upload_image_disabled = videos_cnt > 0 | |
| upload_video_disabled = videos_cnt > 0 or images_cnt > 0 | |
| _question, _chat_bot, _app_cfg = respond(_question, _chat_bot, _app_cfg, params_form) | |
| return _question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg | |
| else: | |
| last_message = _chat_bot[-1][0] | |
| last_image = None | |
| last_user_message = '' | |
| if last_message.text: | |
| last_user_message = last_message.text | |
| if last_message.files: | |
| last_image = last_message.files[0].file.path | |
| _chat_bot = _chat_bot[:-1] | |
| _app_cfg['ctx'] = _app_cfg['ctx'][:-2] | |
| _image, _user_message, _assistant_message, _chat_bot, _app_cfg = fewshot_respond(last_image, last_user_message, _chat_bot, _app_cfg, params_form) | |
| return _question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg | |
| def flushed(): | |
| return gr.update(interactive=True) | |
| def clear(txt_message, chat_bot, app_session): | |
| txt_message.files.clear() | |
| txt_message.text = '' | |
| chat_bot = copy.deepcopy(init_conversation) | |
| app_session['sts'] = None | |
| app_session['ctx'] = [] | |
| app_session['images_cnt'] = 0 | |
| app_session['videos_cnt'] = 0 | |
| return create_multimodal_input(), chat_bot, app_session, None, '', '' | |
| def select_chat_type(_tab, _app_cfg): | |
| _app_cfg["chat_type"] = _tab | |
| return _app_cfg | |
| init_conversation = [ | |
| [ | |
| None, | |
| { | |
| # The first message of bot closes the typewriter. | |
| "text": "You can talk to me now", | |
| "flushing": False | |
| } | |
| ], | |
| ] | |
| css = """ | |
| video { height: auto !important; } | |
| .example label { font-size: 16px;} | |
| """ | |
| introduction = """ | |
| ## | |
| Github: | |
| [mPLUG-Owl](https://github.com/X-PLUG/mPLUG-Owl) | |
| Checkpoint: | |
| [mPLUG-Owl3-7B-240728](https://huggingface.co/mPLUG/mPLUG-Owl3-7B-240728) | |
| Paper: | |
| [mPLUG-Owl3: Towards Long Image-Sequence Understanding in Multi-Modal Large Language Models | |
| ](https://arxiv.org/abs/2408.04840) | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Tab(model_name): | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=300): | |
| gr.Markdown(value=introduction) | |
| params_form = create_component(form_radio, comp='Radio') | |
| regenerate = create_component({'value': 'Regenerate'}, comp='Button') | |
| clear_button = create_component({'value': 'Clear History'}, comp='Button') | |
| with gr.Column(scale=3, min_width=500): | |
| app_session = gr.State({'sts':None,'ctx':[], 'images_cnt': 0, 'videos_cnt': 0, 'chat_type': 'Chat'}) | |
| chat_bot = mgr.Chatbot(label=f"Chat with {model_name}", value=copy.deepcopy(init_conversation), height=600, flushing=False, bubble_full_width=False) | |
| with gr.Tab("Chat") as chat_tab: | |
| txt_message = create_multimodal_input() | |
| chat_tab_label = gr.Textbox(value="Chat", interactive=False, visible=False) | |
| txt_message.submit( | |
| respond, | |
| [txt_message, chat_bot, app_session, params_form], | |
| [txt_message, chat_bot, app_session] | |
| ) | |
| with gr.Tab("Few Shot") as fewshot_tab: | |
| fewshot_tab_label = gr.Textbox(value="Few Shot", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(type="filepath", sources=["upload"]) | |
| with gr.Column(scale=3): | |
| user_message = gr.Textbox(label="User") | |
| assistant_message = gr.Textbox(label="Assistant") | |
| with gr.Row(): | |
| add_demonstration_button = gr.Button("Add Example") | |
| generate_button = gr.Button(value="Generate", variant="primary") | |
| add_demonstration_button.click( | |
| fewshot_add_demonstration, | |
| [image_input, user_message, assistant_message, chat_bot, app_session], | |
| [image_input, user_message, assistant_message, chat_bot, app_session] | |
| ) | |
| generate_button.click( | |
| fewshot_respond, | |
| [image_input, user_message, chat_bot, app_session, params_form], | |
| [image_input, user_message, assistant_message, chat_bot, app_session] | |
| ) | |
| chat_tab.select( | |
| select_chat_type, | |
| [chat_tab_label, app_session], | |
| [app_session] | |
| ) | |
| chat_tab.select( # do clear | |
| clear, | |
| [txt_message, chat_bot, app_session], | |
| [txt_message, chat_bot, app_session, image_input, user_message, assistant_message] | |
| ) | |
| fewshot_tab.select( | |
| select_chat_type, | |
| [fewshot_tab_label, app_session], | |
| [app_session] | |
| ) | |
| fewshot_tab.select( # do clear | |
| clear, | |
| [txt_message, chat_bot, app_session], | |
| [txt_message, chat_bot, app_session, image_input, user_message, assistant_message] | |
| ) | |
| chat_bot.flushed( | |
| flushed, | |
| outputs=[txt_message] | |
| ) | |
| regenerate.click( | |
| regenerate_button_clicked, | |
| [txt_message, image_input, user_message, assistant_message, chat_bot, app_session, params_form], | |
| [txt_message, image_input, user_message, assistant_message, chat_bot, app_session] | |
| ) | |
| clear_button.click( | |
| clear, | |
| [txt_message, chat_bot, app_session], | |
| [txt_message, chat_bot, app_session, image_input, user_message, assistant_message] | |
| ) | |
| # launch | |
| demo.launch(share=False, debug=True, show_api=False, server_port=args.port, server_name=args.host) |