from datasets import build_dataset |
import argparse |
import opts |
import sys |
from pathlib import Path |
from os import path as osp |
import io |
import numpy as np |
import pandas as pd |
import regex as re |
import json |
import cv2 |
from PIL import Image |
import torch |
from torchvision.transforms import functional as F |
from skimage import measure |
from shapely.geometry import Polygon, MultiPolygon |
import matplotlib.pyplot as plt |
from matplotlib.collections import PatchCollection |
from matplotlib.patches import Rectangle |
import ipywidgets as widgets |
from IPython.display import display, clear_output |
parser = argparse.ArgumentParser('ReferFormer training and evaluation script', parents=[opts.get_args_parser()]) |
args = parser.parse_args() |
train_dataset = build_dataset('ytvos', image_set = 'train', args = args) |
metas = train_dataset.metas |
selected_frames_df = pd.read_json("selected_frames4.jsonl", lines = True) |
def prepare_mask_for_pil(mask_tensor): |
mask_array = mask_tensor.squeeze(0).cpu().numpy() |
mask_array = (mask_array * 255).astype(np.uint8) |
mask_image = Image.fromarray(mask_array) |
return mask_image |
def create_sub_masks(mask_image): |
width, height = mask_image.size |
sub_masks = {} |
for x in range(width): |
for y in range(height): |
pixel = mask_image.getpixel((x, y)) |
if pixel != 0 : |
pixel_str = str(pixel) |
sub_mask = sub_masks.get(pixel_str) |
if sub_mask is None: |
sub_masks[pixel_str] = Image.new('1', (width+2, height+2)) |
sub_masks[pixel_str].putpixel((x+1, y+1), 1) |
return sub_masks |
def create_sub_mask_annotation(sub_mask, image_id, annotation_id, is_crowd): |
contours = measure.find_contours(sub_mask, 0.5, positive_orientation='low') |
segmentations = [] |
polygons = [] |
for contour in contours: |
for i in range(len(contour)): |
row, col = contour[i] |
contour[i] = (col - 1, row - 1) |
poly = Polygon(contour) |
poly = poly.simplify(1.0, preserve_topology=False) |
polygons.append(poly) |
segmentation = np.array(poly.exterior.coords).ravel().tolist() |
segmentations.append(segmentation) |
multi_poly = MultiPolygon(polygons) |
x, y, max_x, max_y = multi_poly.bounds |
width = max_x - x |
height = max_y - y |
bbox = (x, y, width, height) |
area = multi_poly.area |
annotation = { |
'segmentation': segmentations, |
'iscrowd': is_crowd, |
'image_id': image_id, |
'id': annotation_id, |
'bbox': bbox, |
'area': area |
} |
return annotation |
def showRef(annotation, image_dir, seg_box='seg'): |
ax = plt.gca() |
I = io.imread(osp.join(image_dir, annotation['file_name'])) |
ax.imshow(I) |
for sid, sent in enumerate(annotation['sentences']): |
print('%s. %s' % (sid + 1, sent)) |
if seg_box == 'seg': |
polygons = [] |
color = [] |
c = (np.random.random((1, 3)) * 0.6 + 0.4).tolist()[0] |
if type(annotation['segmentation'][0]) == list: |
for seg in annotation['segmentation']: |
poly = np.array(seg).reshape((int(len(seg) / 2), 2)) |
polygons.append(Polygon(poly)) |
color.append(c) |
p = PatchCollection(polygons, |
facecolors=(221/255, 160/255, 221/255), |
linewidths=0, |
alpha=0.4) |
ax.add_collection(p) |
p = PatchCollection(polygons, |
facecolors='none', |
edgecolors=color, |
linewidths=2) |
ax.add_collection(p) |
elif seg_box == 'box': |
bbox = annotation['bbox'] |
box_plot = Rectangle((bbox[0], bbox[1]), |
bbox[2], |
bbox[3], |
fill=False, |
edgecolor='green', |
linewidth=3) |
ax.add_patch(box_plot) |
def create_dict_from_selected_images(selected_frames_df): |
image_id = 0 |
anno_id = 0 |
train_idx = 0 |
with open("/home/yejin/data/data/dataset/VRIS/mbench/ytvos/selected_instances2.jsonl", "w") as f: |
for selected_idx in range(len(selected_frames_df)): |
selected = selected_frames_df.loc[selected_idx] |
selected_vid_id = selected['video'] |
selected_frame_id = selected['frame_id'] |
for obj_id in selected['objects'].keys(): |
selected_exp = selected['objects'][obj_id][0] |
selected_verb = selected['objects'][obj_id][1] |
train_idx = next( |
idx for idx, meta in enumerate(metas) |
if meta['video'] == selected_vid_id |
and meta['frame_id'] == selected_frame_id |
and meta['obj_id'] == int(obj_id) |
and meta['exp'] == selected_exp |
) |
train_frames, train_info = train_dataset[train_idx] |
try: |
valid_frame_loc = train_info['frames_idx'].tolist().index(selected_frame_id) |
except ValueError: |
print(f"selected vid id: {selected_vid_id}, metas['frame_id']: {metas[train_idx]['frame_id']}, selected frame id: {selected_frame_id}, train_info['frames_idx']: {train_info['frames_idx'].tolist()}") |
frame = train_frames[valid_frame_loc] |
frame = F.to_pil_image(frame) |
image_file_name = f"{selected_vid_id}_{str(selected_frame_id).rjust(5, '0')}" |
save_dir = Path("/home/yejin/data/data/dataset/VRIS/mbench/ytvos/selected_frames") |
save_path = save_dir / f"{image_file_name}.png" |
label = train_info['labels'][valid_frame_loc].item() |
category_name = metas[train_idx]['category'] |
box = train_info['boxes'][valid_frame_loc] |
mask = train_info['masks'][valid_frame_loc] |
mask_image = prepare_mask_for_pil(mask) |
sub_masks = create_sub_masks(mask_image) |
for color, sub_mask in sub_masks.items(): |
sub_mask_array = np.array(sub_mask, dtype=np.uint8) |
annotation = create_sub_mask_annotation(sub_mask_array, image_id, anno_id, is_crowd = 0) |
anno_id += 1 |
image_id += 1 |
annotation['file_name'] = f"{image_file_name}.png" |
annotation.pop('iscrowd', None) |
annotation.pop('image_id', None) |
annotation.pop('id', None) |
valid = train_info['valid'][valid_frame_loc] |
orig_size = train_info['orig_size'] |
size = train_info['size'] |
caption = metas[train_idx]['exp'] |
annotation['height'] = orig_size[0].item() |
annotation['width'] = orig_size[1].item() |
annotation['label'] = label |
annotation['category_name'] = category_name |
sentence_dict = { |
"tokens" : caption.split(' '), |
"raw" : caption, |
"sent" : re.sub('[^A-Za-z0-9\s]+', '', caption.lower()) |
} |
annotation['sentences'] = sentence_dict |
f.write(json.dumps(annotation) + "\n") |
f.flush() |
if __name__ == '__main__': |
create_dict_from_selected_images(selected_frames_df) |