|
import argparse |
|
import fnmatch |
|
import time |
|
|
|
import numpy as np |
|
import skimage |
|
import skimage.measure |
|
import skimage.io |
|
from tqdm import tqdm |
|
import shapely.geometry |
|
import shapely.ops |
|
import shapely.prepared |
|
import cv2 |
|
|
|
from functools import partial |
|
|
|
import torch |
|
|
|
from frame_field_learning import polygonize_utils |
|
from frame_field_learning import frame_field_utils |
|
|
|
from torch_lydorn.torch.nn.functionnal import bilinear_interpolate |
|
from torch_lydorn.torchvision.transforms import polygons_to_tensorpoly, tensorpoly_pad |
|
|
|
from lydorn_utils import math_utils |
|
from lydorn_utils import python_utils |
|
from lydorn_utils import print_utils |
|
|
|
|
|
DEBUG = False |
|
|
|
|
|
def debug_print(s: str): |
|
if DEBUG: |
|
print_utils.print_debug(s) |
|
|
|
|
|
def get_args(): |
|
argparser = argparse.ArgumentParser(description=__doc__) |
|
argparser.add_argument( |
|
'--raw_pred', |
|
nargs='*', |
|
type=str, |
|
help='Filepath to the raw pred file(s)') |
|
argparser.add_argument( |
|
'--im_filepath', |
|
type=str, |
|
help='Filepath to input image. Will retrieve seg and crossfield in the same directory') |
|
argparser.add_argument( |
|
'--dirpath', |
|
type=str, |
|
help='Path to directory containing seg and crossfield files. Will perform polygonization on all.') |
|
argparser.add_argument( |
|
'--bbox', |
|
nargs='*', |
|
type=int, |
|
help='Selects area in bbox for computation: [min_row, min_col, max_row, max_col]') |
|
argparser.add_argument( |
|
'--steps', |
|
type=int, |
|
help='Optim steps') |
|
|
|
args = argparser.parse_args() |
|
return args |
|
|
|
|
|
class PolygonAlignLoss: |
|
def __init__(self, indicator, level, c0c2, data_coef, length_coef, crossfield_coef, dist=None, dist_coef=None): |
|
self.indicator = indicator |
|
self.level = level |
|
self.c0c2 = c0c2 |
|
self.dist = dist |
|
|
|
self.data_coef = data_coef |
|
self.length_coef = length_coef |
|
self.crossfield_coef = crossfield_coef |
|
self.dist_coef = dist_coef |
|
|
|
def __call__(self, tensorpoly): |
|
""" |
|
|
|
:param tensorpoly: closed polygon |
|
:return: |
|
""" |
|
polygon = tensorpoly.pos[tensorpoly.to_padded_index] |
|
polygon_batch = tensorpoly.batch[tensorpoly.to_padded_index] |
|
|
|
|
|
edges = polygon[1:] - polygon[:-1] |
|
|
|
|
|
edge_mask = torch.ones((edges.shape[0]), device=edges.device) |
|
edge_mask[tensorpoly.to_unpadded_poly_slice[:-1, 1]] = 0 |
|
|
|
midpoints = (polygon[1:] + polygon[:-1]) / 2 |
|
midpoints_batch = polygon_batch[1:] |
|
|
|
midpoints_int = midpoints.round().long() |
|
midpoints_int[:, 0] = torch.clamp(midpoints_int[:, 0], 0, self.c0c2.shape[2] - 1) |
|
midpoints_int[:, 1] = torch.clamp(midpoints_int[:, 1], 0, self.c0c2.shape[3] - 1) |
|
midpoints_c0 = self.c0c2[midpoints_batch, :2, midpoints_int[:, 0], midpoints_int[:, 1]] |
|
midpoints_c2 = self.c0c2[midpoints_batch, 2:, midpoints_int[:, 0], midpoints_int[:, 1]] |
|
|
|
norms = torch.norm(edges, dim=-1) |
|
|
|
edge_mask[norms < 0.1] = 0 |
|
z = edges / (norms[:, None] + 1e-3) |
|
|
|
|
|
align_loss = frame_field_utils.framefield_align_error(midpoints_c0, midpoints_c2, z, complex_dim=1) |
|
align_loss = align_loss * edge_mask |
|
total_align_loss = torch.sum(align_loss) |
|
|
|
|
|
pos_indicator_value = bilinear_interpolate(self.indicator[:, None, ...], tensorpoly.pos, batch=tensorpoly.batch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
level_loss = torch.sum(torch.pow(pos_indicator_value - self.level, 2)) |
|
|
|
|
|
dist_loss = None |
|
if self.dist is not None: |
|
pos_dist_value = bilinear_interpolate(self.dist[:, None, ...], tensorpoly.pos, batch=tensorpoly.batch) |
|
dist_loss = torch.sum(torch.pow(pos_dist_value, 2)) |
|
|
|
length_penalty = torch.sum( |
|
torch.pow(norms * edge_mask, 2)) |
|
|
|
|
|
losses_dict = { |
|
"align": total_align_loss.item(), |
|
"level": level_loss.item(), |
|
"length": length_penalty.item(), |
|
} |
|
coef_sum = self.data_coef + self.length_coef + self.crossfield_coef |
|
total_loss = (self.data_coef * level_loss + self.length_coef * length_penalty + self.crossfield_coef * total_align_loss) |
|
if dist_loss is not None: |
|
losses_dict["dist"] = dist_loss.item() |
|
total_loss += self.dist_coef * dist_loss |
|
coef_sum += self.dist_coef |
|
total_loss /= coef_sum |
|
return total_loss, losses_dict |
|
|
|
|
|
class TensorPolyOptimizer: |
|
def __init__(self, config, tensorpoly, indicator, c0c2, data_coef, length_coef, crossfield_coef, dist=None, dist_coef=None): |
|
assert len(indicator.shape) == 3, "indicator: (N, H, W)" |
|
assert len(c0c2.shape) == 4 and c0c2.shape[1] == 4, "c0c2: (N, 4, H, W)" |
|
if dist is not None: |
|
assert len(dist.shape) == 3, "dist: (N, H, W)" |
|
|
|
|
|
self.config = config |
|
self.tensorpoly = tensorpoly |
|
|
|
|
|
self.tensorpoly.pos.requires_grad = True |
|
|
|
|
|
self.endpoint_pos = self.tensorpoly.pos[self.tensorpoly.is_endpoint].clone() |
|
|
|
self.criterion = PolygonAlignLoss(indicator, config["data_level"], c0c2, data_coef, length_coef, |
|
crossfield_coef, dist=dist, dist_coef=dist_coef) |
|
self.optimizer = torch.optim.SGD([tensorpoly.pos], lr=config["poly_lr"]) |
|
|
|
def lr_warmup_func(iter): |
|
if iter < config["warmup_iters"]: |
|
coef = 1 + (config["warmup_factor"] - 1) * (config["warmup_iters"] - iter) / config["warmup_iters"] |
|
else: |
|
coef = 1 |
|
return coef |
|
|
|
self.lr_scheduler = torch.optim.lr_scheduler.LambdaLR(self.optimizer, lr_lambda=lr_warmup_func) |
|
|
|
def step(self, iter_num): |
|
self.optimizer.zero_grad() |
|
loss, losses_dict = self.criterion(self.tensorpoly) |
|
|
|
loss.backward() |
|
|
|
self.optimizer.step() |
|
self.lr_scheduler.step(iter_num) |
|
|
|
|
|
with torch.no_grad(): |
|
self.tensorpoly.pos[self.tensorpoly.is_endpoint] = self.endpoint_pos |
|
return loss.item(), losses_dict |
|
|
|
def optimize(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
optim_iter = range(self.config["steps"]) |
|
for iter_num in optim_iter: |
|
loss, losses_dict = self.step(iter_num) |
|
return self.tensorpoly |
|
|
|
|
|
def contours_batch_to_tensorpoly(contours_batch): |
|
|
|
tensorpoly = polygons_to_tensorpoly(contours_batch) |
|
|
|
tensorpoly = tensorpoly_pad(tensorpoly, padding=(0, 1)) |
|
return tensorpoly |
|
|
|
|
|
def tensorpoly_to_contours_batch(tensorpoly): |
|
|
|
contours_batch = [[] for _ in range(tensorpoly.batch_size)] |
|
for poly_i in range(tensorpoly.poly_slice.shape[0]): |
|
s = tensorpoly.poly_slice[poly_i, :] |
|
contour = np.array(tensorpoly.pos[s[0]:s[1], :].detach().cpu()) |
|
is_open = tensorpoly.is_endpoint[s[0]] |
|
if not is_open: |
|
|
|
contour = np.concatenate([contour, contour[:1, :]], axis=0) |
|
batch_i = tensorpoly.batch[s[0]] |
|
contours_batch[batch_i].append(contour) |
|
return contours_batch |
|
|
|
|
|
def print_contours_stats(contours): |
|
min_length = contours[0].shape[0] |
|
max_length = contours[0].shape[0] |
|
nb_vertices = 0 |
|
for contour in contours: |
|
nb_vertices += contour.shape[0] |
|
if contour.shape[0] < min_length: |
|
min_length = contour.shape[0] |
|
if max_length < contour.shape[0]: |
|
max_length = contour.shape[0] |
|
print("Nb polygon:", len(contours), "Nb vertices:", nb_vertices, "Min lengh:", min_length, "Max lengh:", max_length) |
|
|
|
|
|
def shapely_postprocess(contours, u, v, np_indicator, tolerance, config): |
|
if type(tolerance) == list: |
|
|
|
out_polygons_dict = {} |
|
out_probs_dict = {} |
|
for tol in tolerance: |
|
out_polygons, out_probs = shapely_postprocess(contours, u, v, np_indicator, tol, config) |
|
out_polygons_dict["tol_{}".format(tol)] = out_polygons |
|
out_probs_dict["tol_{}".format(tol)] = out_probs |
|
return out_polygons_dict, out_probs_dict |
|
else: |
|
height = np_indicator.shape[0] |
|
width = np_indicator.shape[1] |
|
|
|
|
|
|
|
|
|
contours = [skimage.measure.approximate_polygon(contour, tolerance=min(1, tolerance)) for contour in contours] |
|
corner_masks = frame_field_utils.detect_corners(contours, u, v) |
|
contours = polygonize_utils.split_polylines_corner(contours, corner_masks) |
|
|
|
|
|
line_string_list = [shapely.geometry.LineString(out_contour[:, ::-1]) for out_contour in contours] |
|
|
|
line_string_list = [line_string.simplify(tolerance, preserve_topology=True) for line_string in line_string_list] |
|
|
|
|
|
line_string_list.append( |
|
shapely.geometry.LinearRing([ |
|
(0, 0), |
|
(0, height - 1), |
|
(width - 1, height - 1), |
|
(width - 1, 0), |
|
])) |
|
|
|
|
|
|
|
|
|
multi_line_string = shapely.ops.unary_union(line_string_list) |
|
|
|
|
|
|
|
|
|
polygons, dangles, cuts, invalids = shapely.ops.polygonize_full(multi_line_string) |
|
polygons = list(polygons) |
|
|
|
|
|
|
|
|
|
polygons = [polygon for polygon in polygons if |
|
config["min_area"] < polygon.area] |
|
|
|
|
|
|
|
|
|
filtered_polygons = [] |
|
filtered_polygon_probs = [] |
|
for polygon in polygons: |
|
prob = polygonize_utils.compute_geom_prob(polygon, np_indicator) |
|
|
|
if config["seg_threshold"] < prob: |
|
filtered_polygons.append(polygon) |
|
filtered_polygon_probs.append(prob) |
|
|
|
return filtered_polygons, filtered_polygon_probs |
|
|
|
|
|
def post_process(contours, np_seg, np_crossfield, config): |
|
u, v = math_utils.compute_crossfield_uv(np_crossfield) |
|
|
|
np_indicator = np_seg[:, :, 0] |
|
polygons, probs = shapely_postprocess(contours, u, v, np_indicator, config["tolerance"], config) |
|
|
|
return polygons, probs |
|
|
|
|
|
def polygonize(seg_batch, crossfield_batch, config, pool=None, pre_computed=None): |
|
tic_start = time.time() |
|
|
|
assert len(seg_batch.shape) == 4 and seg_batch.shape[ |
|
1] <= 3, "seg_batch should be (N, C, H, W) with C <= 3, not {}".format(seg_batch.shape) |
|
assert len(crossfield_batch.shape) == 4 and crossfield_batch.shape[ |
|
1] == 4, "crossfield_batch should be (N, 4, H, W)" |
|
assert seg_batch.shape[0] == crossfield_batch.shape[0], "Batch size for seg and crossfield should match" |
|
|
|
|
|
|
|
|
|
indicator_batch = seg_batch[:, 0, :, :] |
|
np_indicator_batch = indicator_batch.cpu().numpy() |
|
indicator_batch = indicator_batch.to(config["device"]) |
|
|
|
|
|
|
|
|
|
dist_batch = None |
|
if "dist_coef" in config: |
|
|
|
np_dist_batch = np.empty(np_indicator_batch.shape) |
|
for batch_i in range(np_indicator_batch.shape[0]): |
|
dist_1 = cv2.distanceTransform(np_indicator_batch[batch_i].astype(np.uint8), distanceType=cv2.DIST_L2, maskSize=cv2.DIST_MASK_5, dstType=cv2.CV_64F) |
|
dist_2 = cv2.distanceTransform(1 - np_indicator_batch[batch_i].astype(np.uint8), distanceType=cv2.DIST_L2, maskSize=cv2.DIST_MASK_5, dstType=cv2.CV_64F) |
|
np_dist_batch[0] = dist_1 + dist_2 - 1 |
|
dist_batch = torch.from_numpy(np_dist_batch) |
|
dist_batch = dist_batch.to(config["device"]) |
|
|
|
|
|
|
|
|
|
|
|
if pre_computed is None or "init_contours_batch" not in pre_computed: |
|
|
|
init_contours_batch = polygonize_utils.compute_init_contours_batch(np_indicator_batch, config["data_level"], pool=pool) |
|
|
|
|
|
else: |
|
init_contours_batch = pre_computed["init_contours_batch"] |
|
|
|
|
|
tensorpoly = contours_batch_to_tensorpoly(init_contours_batch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
tensorpoly.to(config["device"]) |
|
crossfield_batch = crossfield_batch.to(config["device"]) |
|
dist_coef = config["dist_coef"] if "dist_coef" in config else None |
|
tensorpoly_optimizer = TensorPolyOptimizer(config, tensorpoly, indicator_batch, crossfield_batch, |
|
config["data_coef"], |
|
config["length_coef"], config["crossfield_coef"], dist=dist_batch, dist_coef=dist_coef) |
|
tensorpoly = tensorpoly_optimizer.optimize() |
|
|
|
out_contours_batch = tensorpoly_to_contours_batch(tensorpoly) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
np_seg_batch = np.transpose(seg_batch.cpu().numpy(), (0, 2, 3, 1)) |
|
np_crossfield_batch = np.transpose(crossfield_batch.cpu().numpy(), (0, 2, 3, 1)) |
|
if pool is not None: |
|
post_process_partial = partial(post_process, config=config) |
|
polygons_probs_batch = pool.starmap(post_process_partial, zip(out_contours_batch, np_seg_batch, np_crossfield_batch)) |
|
polygons_batch, probs_batch = zip(*polygons_probs_batch) |
|
else: |
|
polygons_batch = [] |
|
probs_batch = [] |
|
for i, out_contours in enumerate(out_contours_batch): |
|
polygons, probs = post_process(out_contours, np_seg_batch[i], np_crossfield_batch[i], config) |
|
polygons_batch.append(polygons) |
|
probs_batch.append(probs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
toc_end = time.time() |
|
|
|
|
|
return polygons_batch, probs_batch |
|
|
|
|
|
def main(): |
|
from frame_field_learning import framefield, inference |
|
import os |
|
|
|
def save_gt_poly(raw_pred_filepath, name): |
|
filapth_format = "/data/mapping_challenge_dataset/processed/val/data_{}.pt" |
|
sample = torch.load(filapth_format.format(name)) |
|
polygon_arrays = sample["gt_polygons"] |
|
polygons = [shapely.geometry.Polygon(polygon[:, ::-1]) for polygon in polygon_arrays] |
|
base_filepath = os.path.join(os.path.dirname(raw_pred_filepath), name) |
|
filepath = base_filepath + "." + name + ".pdf" |
|
plot_utils.save_poly_viz(image, polygons, filepath) |
|
|
|
config = { |
|
"indicator_add_edge": False, |
|
"steps": 500, |
|
"data_level": 0.5, |
|
"data_coef": 0.1, |
|
"length_coef": 0.4, |
|
"crossfield_coef": 0.5, |
|
"poly_lr": 0.01, |
|
"warmup_iters": 100, |
|
"warmup_factor": 0.1, |
|
"device": "cuda", |
|
"tolerance": 0.5, |
|
"seg_threshold": 0.5, |
|
"min_area": 1, |
|
|
|
"inner_polylines_params": { |
|
"enable": False, |
|
"max_traces": 1000, |
|
"seed_threshold": 0.5, |
|
"low_threshold": 0.1, |
|
"min_width": 2, |
|
"max_width": 8, |
|
"step_size": 1, |
|
} |
|
} |
|
|
|
args = get_args() |
|
if args.steps is not None: |
|
config["steps"] = args.steps |
|
|
|
if args.raw_pred is not None: |
|
|
|
image_list = [] |
|
name_list = [] |
|
seg_list = [] |
|
crossfield_list = [] |
|
for raw_pred_filepath in args.raw_pred: |
|
raw_pred = torch.load(raw_pred_filepath) |
|
image_list.append(raw_pred["image"]) |
|
name_list.append(raw_pred["name"]) |
|
seg_list.append(raw_pred["seg"]) |
|
crossfield_list.append(raw_pred["crossfield"]) |
|
seg_batch = torch.stack(seg_list, dim=0) |
|
crossfield_batch = torch.stack(crossfield_list, dim=0) |
|
|
|
out_contours_batch, out_probs_batch = polygonize(seg_batch, crossfield_batch, config) |
|
|
|
for i, raw_pred_filepath in enumerate(args.raw_pred): |
|
image = image_list[i] |
|
name = name_list[i] |
|
polygons = out_contours_batch[i] |
|
base_filepath = os.path.join(os.path.dirname(raw_pred_filepath), name) |
|
filepath = base_filepath + ".poly_acm.pdf" |
|
plot_utils.save_poly_viz(image, polygons, filepath) |
|
|
|
|
|
save_gt_poly(raw_pred_filepath, name) |
|
elif args.im_filepath: |
|
|
|
|
|
image = skimage.io.imread(args.im_filepath) |
|
base_filepath = os.path.splitext(args.im_filepath)[0] |
|
seg = skimage.io.imread(base_filepath + ".seg.tif") / 255 |
|
crossfield = np.load(base_filepath + ".crossfield.npy", allow_pickle=True) |
|
|
|
|
|
if args.bbox is not None: |
|
assert len(args.bbox) == 4, "bbox should have 4 values" |
|
bbox = args.bbox |
|
|
|
|
|
image = image[bbox[0]:bbox[2], bbox[1]:bbox[3]] |
|
seg = seg[bbox[0]:bbox[2], bbox[1]:bbox[3]] |
|
crossfield = crossfield[bbox[0]:bbox[2], bbox[1]:bbox[3]] |
|
extra_name = ".bbox_{}_{}_{}_{}".format(*bbox) |
|
else: |
|
extra_name = "" |
|
|
|
|
|
seg_batch = torch.tensor(np.transpose(seg[:, :, :2], (2, 0, 1)), dtype=torch.float)[None, ...] |
|
crossfield_batch = torch.tensor(np.transpose(crossfield, (2, 0, 1)), dtype=torch.float)[None, ...] |
|
|
|
out_contours_batch, out_probs_batch = polygonize(seg_batch, crossfield_batch, config) |
|
|
|
polygons = out_contours_batch[0] |
|
|
|
|
|
|
|
|
|
filepath = base_filepath + extra_name + ".poly_acm.pdf" |
|
plot_utils.save_poly_viz(image, polygons, filepath, linewidths=1, draw_vertices=True, color_choices=[[0, 1, 0, 1]]) |
|
elif args.dirpath: |
|
seg_filename_list = fnmatch.filter(os.listdir(args.dirpath), "*.seg.tif") |
|
sorted(seg_filename_list) |
|
pbar = tqdm(seg_filename_list, desc="Poly files") |
|
for id, seg_filename in enumerate(pbar): |
|
basename = seg_filename[:-len(".seg.tif")] |
|
|
|
|
|
|
|
|
|
|
|
pbar.set_postfix(name=basename, status="Loading data...") |
|
crossfield_filename = basename + ".crossfield.npy" |
|
metadata_filename = basename + ".metadata.json" |
|
seg = skimage.io.imread(os.path.join(args.dirpath, seg_filename)) / 255 |
|
crossfield = np.load(os.path.join(args.dirpath, crossfield_filename), allow_pickle=True) |
|
metadata = python_utils.load_json(os.path.join(args.dirpath, metadata_filename)) |
|
|
|
|
|
|
|
|
|
|
|
seg_batch = torch.tensor(np.transpose(seg[:, :, :2], (2, 0, 1)), dtype=torch.float)[None, ...] |
|
crossfield_batch = torch.tensor(np.transpose(crossfield, (2, 0, 1)), dtype=torch.float)[None, ...] |
|
|
|
pbar.set_postfix(name=basename, status="Polygonazing...") |
|
out_contours_batch, out_probs_batch = polygonize(seg_batch, crossfield_batch, config) |
|
|
|
polygons = out_contours_batch[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
base_filepath = os.path.join(args.dirpath, basename) |
|
inference.save_poly_coco(polygons, id, base_filepath, "annotation.poly") |
|
else: |
|
print("Showcase on a very simple example:") |
|
seg = np.zeros((6, 8, 3)) |
|
|
|
seg[1, 4] = 1 |
|
seg[2, 3:5] = 1 |
|
seg[3, 2:5] = 1 |
|
seg[4, 1:5] = 1 |
|
|
|
seg[3:5, 5:7] = 1 |
|
|
|
u = np.zeros((6, 8), dtype=np.complex) |
|
v = np.zeros((6, 8), dtype=np.complex) |
|
|
|
u.real = 1 |
|
v.imag = 1 |
|
|
|
u[:4, :4] *= np.exp(1j * np.pi/4) |
|
v[:4, :4] *= np.exp(1j * np.pi/4) |
|
|
|
|
|
|
|
|
|
crossfield = math_utils.compute_crossfield_c0c2(u, v) |
|
|
|
seg_batch = torch.tensor(np.transpose(seg[:, :, :2], (2, 0, 1)), dtype=torch.float)[None, ...] |
|
crossfield_batch = torch.tensor(np.transpose(crossfield, (2, 0, 1)), dtype=torch.float)[None, ...] |
|
|
|
out_contours_batch, out_probs_batch = polygonize(seg_batch, crossfield_batch, config) |
|
|
|
polygons = out_contours_batch[0] |
|
|
|
filepath = "demo_poly_acm.pdf" |
|
plot_utils.save_poly_viz(seg, polygons, filepath, linewidths=0.5, draw_vertices=True, crossfield=crossfield) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|