#%% import os from typing import List, Union import json import cv2 import lmdb import random import numpy as np import pyarrow as pa import torch from torch.utils.data import Dataset import itertools import albumentations as A from albumentations.pytorch import ToTensorV2 from .simple_tokenizer import SimpleTokenizer as _Tokenizer info = { 'refcoco': { 'train': 42404, 'val': 3811, 'val-test': 3811, 'testA': 1975, 'testB': 1810 }, 'refcoco+': { 'train': 42278, 'val': 3805, 'val-test': 3805, 'testA': 1975, 'testB': 1798 }, 'refcocog_u': { 'train': 42226, 'val': 2573, 'val-test': 2573, 'test': 5023, }, 'refcocog_g': { 'train': 44822, 'val': 5000, 'val-test': 5000 } } _tokenizer = _Tokenizer() #%% def tokenize(texts: Union[str, List[str]], context_length: int = 77, truncate: bool = False) -> torch.LongTensor: """ Returns the tokenized representation of given input string(s) Parameters ---------- texts : Union[str, List[str]] An input string or a list of input strings to tokenize context_length : int The context length to use; all CLIP models use 77 as the context length truncate: bool Whether to truncate the text in case its encoding is longer than the context length Returns ------- A two-dimensional tensor containing the resulting tokens, shape = [number of input strings, context_length] """ if isinstance(texts, str): texts = [texts] sot_token = _tokenizer.encoder["<|startoftext|>"] eot_token = _tokenizer.encoder["<|endoftext|>"] all_tokens = [[sot_token] + _tokenizer.encode(text) + [eot_token] for text in texts] result = torch.zeros(len(all_tokens), context_length, dtype=torch.long) for i, tokens in enumerate(all_tokens): if len(tokens) > context_length: if truncate: tokens = tokens[:context_length] tokens[-1] = eot_token else: raise RuntimeError( f"Input {texts[i]} is too long for context length {context_length}" ) result[i, :len(tokens)] = torch.tensor(tokens) return result def loads_pyarrow(buf): """ Args: buf: the output of `dumps`. """ return pa.deserialize(buf) class RefDataset(Dataset): def __init__(self, lmdb_dir, mask_dir, dataset, split, mode, input_size, word_length, args): super(RefDataset, self).__init__() self.lmdb_dir = lmdb_dir self.mask_dir = mask_dir self.dataset = dataset self.split = split self.mode = mode self.input_size = (input_size, input_size) self.word_length = word_length self.mean = torch.tensor([0.48145466, 0.4578275, 0.40821073]).reshape(3, 1, 1) self.std = torch.tensor([0.26862954, 0.26130258, 0.27577711]).reshape(3, 1, 1) self.length = info[dataset][split] self.env = None self.exclude_position = args.exclude_pos self.metric_learning = args.metric_learning self.exclude_multiobj = args.exclude_multiobj self.metric_mode = args.metric_mode self.resize_bg1 = A.Compose([ A.Resize(input_size, input_size, always_apply=True)]) if self.metric_learning: self.hardneg_prob = args.hn_prob # Hard negative probability Γί°‘ self.multi_obj_ref_ids = self._load_multi_obj_ref_ids() self.hardpos_meta, self.hardneg_meta = self._load_metadata() else: self.hardneg_prob = 0.0 self.multi_obj_ref_ids = None self.hardpos_meta, self.hardneg_meta = None, None def _load_multi_obj_ref_ids(self): # Load multi-object reference IDs based on configurations if not self.exclude_multiobj and not self.exclude_position : return None elif self.exclude_position: multiobj_path = '/home/chaeyun/data/projects/chaeyun/RIS/CRIS.pytorch/multiobj_ov2_nopos.txt' elif self.exclude_multiobj : multiobj_path = '/home/chaeyun/data/projects/chaeyun/RIS/CRIS.pytorch/multiobj_ov3.txt' with open(multiobj_path, 'r') as f: return [int(line.strip()) for line in f.readlines()] def _load_metadata(self): # Load metadata for hard positive verb phrases, hard negative queries hardpos_path = '/data2/projects/chaeyun/VerbCentric_RIS/hardpos_verbphrase_0906upd.json' hardneg_path = '/data2/projects/chaeyun/VerbCentric_RIS/hardneg_verb.json' with open(hardpos_path, 'r', encoding='utf-8') as f: hardpos_json = json.load(f) if self.metric_mode == "hardpos_only" : hardneg_json = None else : with open(hardneg_path, 'r', encoding='utf-8') as q: hardneg_json = json.load(q) return hardpos_json, hardneg_json def _init_db(self): self.env = lmdb.open(self.lmdb_dir, subdir=os.path.isdir(self.lmdb_dir), readonly=True, lock=False, readahead=False, meminit=False) with self.env.begin(write=False) as txn: self.length = loads_pyarrow(txn.get(b'__len__')) self.keys = loads_pyarrow(txn.get(b'__keys__')) def __len__(self): return self.length def __getitem__(self, index): # Delay loading LMDB data until after initialization: https://github.com/chainer/chainermn/issues/129 if self.env is None: self._init_db() env = self.env with env.begin(write=False) as txn: byteflow = txn.get(self.keys[index]) ref = loads_pyarrow(byteflow) # img ori_img = cv2.imdecode(np.frombuffer(ref['img'], np.uint8), cv2.IMREAD_COLOR) img = cv2.cvtColor(ori_img, cv2.COLOR_BGR2RGB) # mask seg_id = ref['seg_id'] mask_dir = os.path.join(self.mask_dir, str(seg_id) + '.png') mask = cv2.imdecode(np.frombuffer(ref['mask'], np.uint8), cv2.IMREAD_GRAYSCALE) mask = mask / 255. # image resizing resized = self.resize_bg1(image=img, mask=mask) imgs, masks = [resized['image']], [resized['mask']] img = imgs[0] mask = masks[0] mask = mask.astype(np.uint8) mask[mask>0] = 1 # image transform img_size = img.shape[:2] mat, mat_inv = self.getTransformMat(img_size, True) img = cv2.warpAffine( img, mat, self.input_size, flags=cv2.INTER_CUBIC, borderValue=[0.48145466 * 255, 0.4578275 * 255, 0.40821073 * 255]) # sentences sents = ref['sents'] n_sentences = ref['num_sents'] if self.mode == 'train': # mask transform mask = cv2.warpAffine(mask, mat, self.input_size, flags=cv2.INTER_LINEAR, borderValue=0.) # if metric learning, assign hard positive verb phrase if applicable idx = np.random.choice(n_sentences, 1, replace=False)[0] sent = sents[idx] raw_hardpos, hardpos = self._get_hardpos_verb(ref, seg_id, idx) img, mask = self.convert(img, mask) word_vec = tokenize(sent, self.word_length, True).squeeze(0) if self.metric_mode == "hardpos_only" : return img, word_vec, mask, hardpos else : choice = np.random.choice(['hn', 'no_hn'], p=[self.hardneg_prob, 1 - self.hardneg_prob]) if choice == 'hn' and raw_hardpos : raw_hardneg, hardneg = self._get_hardneg_verb(ref, seg_id, idx) else : hardneg = torch.zeros(self.word_length, dtype=torch.long) return img, word_vec, mask, hardpos, hardneg elif self.mode == 'val': # sentence -> vector sent = sents[0] word_vec = tokenize(sent, self.word_length, True).squeeze(0) img = self.convert(img)[0] params = { 'mask_dir': mask_dir, 'inverse': mat_inv, 'ori_size': np.array(img_size) } return img, word_vec, mask, params else: # sentence -> vector img = self.convert(img)[0] params = { 'ori_img': ori_img, 'seg_id': seg_id, 'mask_dir': mask_dir, 'inverse': mat_inv, 'ori_size': np.array(img_size), 'sents': sents } return img, mask, params def _get_hardneg_verb(self, ref, seg_id, sent_idx): """ Handle the logic for selecting hard positive verb phrases during metric learning. Returns the sentence, raw_verb, and tokenized verb if applicable. """ # Extract metadata for hard positives if present hardneg_dict = self.hardneg_meta.get(str(seg_id), {}) sent_id_list = list(hardneg_dict.keys()) cur_hardneg = hardpos_dict.get(sent_id_list[sent_idx], []) if cur_hardneg: # Assign a hard positive verb phrase if available raw_verb_hardneg = random.choice(cur_hardneg) verb_hardneg = tokenize(raw_verb_hardneg, self.word_length, True).squeeze(0) return raw_verb_hardneg, verb_hardneg verb_hardneg = torch.zeros(self.word_length, dtype=torch.long) return '', verb_hardneg def _get_hardpos_verb(self, ref, seg_id, sent_idx): """ Handle the logic for selecting hard positive verb phrases during metric learning. Returns the sentence, raw_verb, and tokenized verb if applicable. """ # If the object appears multiple times, no hard positive is used if seg_id in self.multi_obj_ref_ids: verb_hardpos = torch.zeros(self.word_length, dtype=torch.long) return '', verb_hardpos # Extract metadata for hard positives if present hardpos_dict = self.hardpos_meta.get(str(seg_id), {}) sent_id_list = list(hardpos_dict.keys()) # cur_hardpos = hardpos_dict.get(sent_id_list[sent_idx], []) cur_hardpos = list(itertools.chain(*hardpos_dict.values())) if cur_hardpos: # Assign a hard positive verb phrase if available raw_verb = random.choice(cur_hardpos) verb_hardpos = tokenize(raw_verb, self.word_length, True).squeeze(0) return raw_verb, verb_hardpos verb_hardpos = torch.zeros(self.word_length, dtype=torch.long) return '', verb_hardpos def getTransformMat(self, img_size, inverse=False): ori_h, ori_w = img_size inp_h, inp_w = self.input_size scale = min(inp_h / ori_h, inp_w / ori_w) new_h, new_w = ori_h * scale, ori_w * scale bias_x, bias_y = (inp_w - new_w) / 2., (inp_h - new_h) / 2. src = np.array([[0, 0], [ori_w, 0], [0, ori_h]], np.float32) dst = np.array([[bias_x, bias_y], [new_w + bias_x, bias_y], [bias_x, new_h + bias_y]], np.float32) mat = cv2.getAffineTransform(src, dst) if inverse: mat_inv = cv2.getAffineTransform(dst, src) return mat, mat_inv return mat, None def convert(self, img, mask=None): # Image ToTensor & Normalize img = torch.from_numpy(img.transpose((2, 0, 1))) if not isinstance(img, torch.FloatTensor): img = img.float() img.div_(255.).sub_(self.mean).div_(self.std) # Mask ToTensor if mask is not None: mask = torch.from_numpy(mask) if not isinstance(mask, torch.FloatTensor): mask = mask.float() return img, mask def __repr__(self): return self.__class__.__name__ + "(" + \ f"db_path={self.lmdb_dir}, " + \ f"dataset={self.dataset}, " + \ f"split={self.split}, " + \ f"mode={self.mode}, " + \ f"input_size={self.input_size}, " + \ f"word_length={self.word_length}" # def get_length(self): # return self.length # def get_sample(self, idx): # return self.__getitem__(idx)