|
|
|
import torch |
|
from PIL import Image |
|
from transformers import AutoModel, AutoTokenizer |
|
import random |
|
import math |
|
import numpy as np |
|
|
|
Image.MAX_IMAGE_PIXELS = 1000000000 |
|
|
|
max_token = { |
|
'docVQA': 100, |
|
'textVQA': 100, |
|
"docVQATest": 100 |
|
} |
|
|
|
class MiniCPM_V: |
|
|
|
def __init__(self, model_path, ckpt, device=None)->None: |
|
self.model_path = model_path |
|
self.ckpt = ckpt |
|
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True).eval() |
|
if self.ckpt is not None: |
|
self.ckpt = ckpt |
|
self.state_dict = torch.load(self.ckpt, map_location=torch.device('cpu')) |
|
self.model.load_state_dict(self.state_dict) |
|
|
|
self.model = self.model.to(dtype=torch.float16) |
|
self.model.to(device) |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True) |
|
torch.cuda.empty_cache() |
|
|
|
def generate(self, images, questions, datasetname): |
|
image = Image.open(images[0]).convert('RGB') |
|
try: |
|
max_new_tokens = max_token[datasetname] |
|
except: |
|
max_new_tokens = 1024 |
|
if (datasetname == 'docVQA') or (datasetname == "docVQATest") : |
|
prompt = "Answer the question directly with single word." + "\n" + questions[0] |
|
elif (datasetname == 'textVQA') : |
|
prompt = "Answer the question directly with single word." + '\n'+ questions[0] |
|
|
|
msgs = [{'role': 'user', 'content': prompt}] |
|
default_kwargs = dict( |
|
max_new_tokens=max_new_tokens, |
|
sampling=False, |
|
num_beams=3 |
|
) |
|
res = self.model.chat( |
|
image=image, |
|
msgs=msgs, |
|
context=None, |
|
tokenizer=self.tokenizer, |
|
**default_kwargs |
|
) |
|
|
|
return [res] |
|
|
|
def generate_with_interleaved(self, images, questions, datasetname): |
|
try: |
|
max_new_tokens = max_token[datasetname] |
|
except: |
|
max_new_tokens = 1024 |
|
|
|
prompt = "Answer the question directly with single word." |
|
|
|
default_kwargs = dict( |
|
max_new_tokens=max_new_tokens, |
|
sampling=False, |
|
num_beams=3 |
|
) |
|
|
|
content = [] |
|
message = [ |
|
{'type': 'text', 'value': prompt}, |
|
{'type': 'image', 'value': images[0]}, |
|
{'type': 'text', 'value': questions[0]} |
|
] |
|
for x in message: |
|
if x['type'] == 'text': |
|
content.append(x['value']) |
|
elif x['type'] == 'image': |
|
image = Image.open(x['value']).convert('RGB') |
|
content.append(image) |
|
msgs = [{'role': 'user', 'content': content}] |
|
|
|
res = self.model.chat( |
|
image=None, |
|
msgs=msgs, |
|
context=None, |
|
tokenizer=self.tokenizer, |
|
**default_kwargs |
|
) |
|
|
|
if isinstance(res, tuple) and len(res) > 0: |
|
res = res[0] |
|
print(f"Q: {content}, \nA: {res}") |
|
return [res] |
|
|
|
|
|
class MiniCPM_V_2_6: |
|
|
|
def __init__(self, model_path, ckpt, device=None)->None: |
|
seed = 0 |
|
random.seed(seed) |
|
np.random.seed(seed) |
|
torch.manual_seed(seed) |
|
torch.cuda.manual_seed_all(seed) |
|
|
|
self.model_path = model_path |
|
self.ckpt = ckpt |
|
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True).eval() |
|
if self.ckpt is not None: |
|
self.ckpt = ckpt |
|
self.state_dict = torch.load(self.ckpt, map_location=torch.device('cpu')) |
|
self.model.load_state_dict(self.state_dict) |
|
|
|
self.model = self.model.to(dtype=torch.bfloat16) |
|
self.model.to(device) |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True) |
|
torch.cuda.empty_cache() |
|
|
|
def generate(self, images, questions, datasetname): |
|
image = Image.open(images[0]).convert('RGB') |
|
try: |
|
max_new_tokens = max_token[datasetname] |
|
except: |
|
max_new_tokens = 1024 |
|
if (datasetname == 'docVQA') or (datasetname == "docVQATest") : |
|
prompt = "Answer the question directly with single word." + "\n" + questions[0] |
|
elif (datasetname == 'textVQA') : |
|
prompt = "Answer the question directly with single word." + '\n'+ questions[0] |
|
|
|
msgs = [{'role': 'user', 'content': prompt}] |
|
default_kwargs = dict( |
|
max_new_tokens=max_new_tokens, |
|
sampling=False, |
|
num_beams=3 |
|
) |
|
res = self.model.chat( |
|
image=image, |
|
msgs=msgs, |
|
context=None, |
|
tokenizer=self.tokenizer, |
|
**default_kwargs |
|
) |
|
|
|
return [res] |
|
|
|
def generate_with_interleaved(self, images, questions, datasetname): |
|
try: |
|
max_new_tokens = max_token[datasetname] |
|
except: |
|
max_new_tokens = 1024 |
|
|
|
prompt = "Answer the question directly with single word." |
|
|
|
default_kwargs = dict( |
|
max_new_tokens=max_new_tokens, |
|
sampling=False, |
|
num_beams=3 |
|
) |
|
|
|
content = [] |
|
message = [ |
|
{'type': 'text', 'value': prompt}, |
|
{'type': 'image', 'value': images[0]}, |
|
{'type': 'text', 'value': questions[0]} |
|
] |
|
for x in message: |
|
if x['type'] == 'text': |
|
content.append(x['value']) |
|
elif x['type'] == 'image': |
|
image = Image.open(x['value']).convert('RGB') |
|
img_width, img_height = image.width, image.height |
|
if (img_width * img_height) >= (1344 * 1344): |
|
content.append(image) |
|
else: |
|
ratio = math.sqrt((1344 * 1344) / (img_width * img_height)) |
|
max_img_width = int(img_width * ratio) |
|
new_img_width = random.randint(img_width, max_img_width) |
|
new_img_height = int(new_img_width / img_width * img_height) |
|
resized_image = image.resize((new_img_width, new_img_height)) |
|
content.append(resized_image) |
|
msgs = [{'role': 'user', 'content': content}] |
|
|
|
res = self.model.chat( |
|
image=None, |
|
msgs=msgs, |
|
context=None, |
|
tokenizer=self.tokenizer, |
|
**default_kwargs |
|
) |
|
|
|
if isinstance(res, tuple) and len(res) > 0: |
|
res = res[0] |
|
print(f"Q: {content}, \nA: {res}") |
|
return [res] |
|
|