Spaces:
Sleeping
Sleeping
File size: 6,110 Bytes
7e4b981 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import string
import math
import torch
from data import data_utils
def get_symbols_to_strip_from_output(generator):
if hasattr(generator, "symbols_to_strip_from_output"):
return generator.symbols_to_strip_from_output
else:
return {generator.bos, generator.eos}
def decode_fn(x, tgt_dict, bpe, generator, tokenizer=None):
x = tgt_dict.string(x.int().cpu(), extra_symbols_to_ignore=get_symbols_to_strip_from_output(generator))
if bpe is not None:
x = bpe.decode(x)
if tokenizer is not None:
x = tokenizer.decode(x)
return x
def eval_caption(task, generator, models, sample):
transtab = str.maketrans({key: None for key in string.punctuation})
hypos = task.inference_step(generator, models, sample)
results = []
for i, sample_id in enumerate(sample["id"].tolist()):
detok_hypo_str = decode_fn(hypos[i][0]["tokens"], task.tgt_dict, task.bpe, generator)
results.append({"image_id": str(sample_id), "caption": detok_hypo_str.translate(transtab).strip()})
return results, None
def eval_vqa_gen(task, generator, models, sample):
encoder_out = models[0].encoder(
sample["net_input"]["src_tokens"],
src_lengths=sample["net_input"]["src_lengths"],
patch_images=sample["net_input"]["patch_images"],
patch_masks=sample["net_input"]["patch_masks"]
)
device = sample["net_input"]["src_tokens"].device
eos_item = torch.tensor([task.src_dict.eos()])
pad = task.src_dict.pad()
valid_result = []
for valid_answers, valid_constraint_masks in zip(task.valid_answers_list, task.valid_constraint_masks_list):
valid_size = len(valid_answers)
valid_tgt_items = [
torch.cat([torch.tensor(decoder_prompt[1:]), valid_answer, eos_item])
for decoder_prompt in sample["decoder_prompts"] for valid_answer in valid_answers
]
valid_prev_items = [
torch.cat([torch.tensor(decoder_prompt), valid_answer])
for decoder_prompt in sample["decoder_prompts"] for valid_answer in valid_answers
]
valid_constraint_mask_items = [
torch.cat(
[torch.zeros(len(decoder_prompt) - 1, valid_constraint_mask.size(1)).bool(), valid_constraint_mask],
dim=0
)
for decoder_prompt in sample["decoder_prompts"] for valid_constraint_mask in valid_constraint_masks
]
valid_tgt = data_utils.collate_tokens(valid_tgt_items, pad_idx=pad).to(device)
valid_prev_output = data_utils.collate_tokens(valid_prev_items, pad_idx=pad).to(device)
valid_constraint_masks = data_utils.collate_tokens(valid_constraint_mask_items, pad_idx=pad).to(device)
new_encoder_out = {}
new_encoder_out["encoder_out"] = [
encoder_out["encoder_out"][0].repeat_interleave(valid_size, dim=1)
]
new_encoder_out["encoder_padding_mask"] = [
encoder_out["encoder_padding_mask"][0].repeat_interleave(valid_size, dim=0)
]
new_encoder_out["position_embeddings"] = [
encoder_out["position_embeddings"][0].repeat_interleave(valid_size, dim=0)
]
decoder_out = models[0].decoder(valid_prev_output, encoder_out=new_encoder_out)
decoder_out[0].masked_fill_(~valid_constraint_masks, -math.inf)
lprobs = models[0].get_normalized_probs(decoder_out, log_probs=True)
scores = lprobs.gather(dim=-1, index=valid_tgt.unsqueeze(-1)).squeeze(-1)
scores = scores.masked_fill(valid_tgt.eq(task.tgt_dict.pad()), 0)
scores = scores.masked_fill((~valid_constraint_masks).all(2), 0)
scores = scores.sum(1)
scores = scores.view(-1, valid_size)
valid_result.append(scores)
valid_result = torch.cat(valid_result, dim=-1)
predicts = valid_result.argmax(1).tolist()
hyps = [task.index2ans[predict_index] for predict_index in predicts]
results = [{"question_id": int(id), "answer": hyp} for id, hyp in zip(sample["id"].tolist(), hyps)]
scores = [ref_dict.get(hyp, 0) for ref_dict, hyp in zip(sample['ref_dict'], hyps)]
return results, scores
def eval_refcoco(task, generator, models, sample):
def _calculate_ap_score(hyps, refs, thresh=0.5):
interacts = torch.cat(
[torch.where(hyps[:, :2] < refs[:, :2], refs[:, :2], hyps[:, :2]),
torch.where(hyps[:, 2:] < refs[:, 2:], hyps[:, 2:], refs[:, 2:])],
dim=1
)
area_predictions = (hyps[:, 2] - hyps[:, 0]) * (hyps[:, 3] - hyps[:, 1])
area_targets = (refs[:, 2] - refs[:, 0]) * (refs[:, 3] - refs[:, 1])
interacts_w = interacts[:, 2] - interacts[:, 0]
interacts_h = interacts[:, 3] - interacts[:, 1]
area_interacts = interacts_w * interacts_h
ious = area_interacts / (area_predictions + area_targets - area_interacts + 1e-6)
return ((ious >= thresh) & (interacts_w > 0) & (interacts_h > 0)).float()
gen_out = task.inference_step(generator, models, sample)
hyps = []
for i in range(len(gen_out)):
hyps.append(gen_out[i][0]["tokens"][:-1] - len(task.src_dict) + task.cfg.num_bins)
hyps = torch.stack(hyps, dim=0)
hyps = hyps / (task.cfg.num_bins - 1) * task.cfg.max_image_size
hyps[:, ::2] /= sample['w_resize_ratios'].unsqueeze(1)
hyps[:, 1::2] /= sample['h_resize_ratios'].unsqueeze(1)
results = [
{"uniq_id": sample_id,
"box": [hyps[i][0].item(), hyps[i][1].item(), hyps[i][2].item(), hyps[i][3].item()]}
for i, sample_id in enumerate(sample["id"].tolist())
]
scores = _calculate_ap_score(hyps, sample['region_coords'].float())
return results, scores
def eval_step(task, generator, models, sample):
if task.cfg._name == 'caption':
return eval_caption(task, generator, models, sample)
elif task.cfg._name == 'vqa_gen':
return eval_vqa_gen(task, generator, models, sample)
elif task.cfg._name == 'refcoco':
return eval_refcoco(task, generator, models, sample)
else:
raise NotImplementedError
|