|
import argparse |
|
|
|
import os |
|
import torch |
|
|
|
import numpy as np |
|
import skimage |
|
import skimage.measure |
|
import skimage.io |
|
import shapely.geometry |
|
import shapely.ops |
|
from PIL import Image |
|
from multiprocess import Pool |
|
from tqdm import tqdm |
|
|
|
from functools import partial |
|
|
|
from lydorn_utils import print_utils, geo_utils |
|
|
|
from frame_field_learning import polygonize_utils, plot_utils |
|
|
|
DEBUG = False |
|
|
|
|
|
def debug_print(s: str): |
|
if DEBUG: |
|
print_utils.print_debug(s) |
|
|
|
|
|
def get_args(): |
|
argparser = argparse.ArgumentParser(description=__doc__) |
|
argparser.add_argument( |
|
'--seg_filepath', |
|
required=True, |
|
nargs='*', |
|
type=str, |
|
help='Filepath(s) to input segmentation/mask image.') |
|
argparser.add_argument( |
|
'--im_dirpath', |
|
required=True, |
|
type=str, |
|
help='Path to the directory containing the corresponding images os the segmentation/mask. ' |
|
'Files must have the same filename as --seg_filepath.' |
|
'Used for vizualization or saving the shapefile with the same coordinate system as that image.') |
|
argparser.add_argument( |
|
'--out_dirpath', |
|
required=True, |
|
type=str, |
|
help='Path to the output directory.') |
|
argparser.add_argument( |
|
'--out_ext', |
|
type=str, |
|
default="shp", |
|
choices=['pdf', 'shp'], |
|
help="File extension of the output. " |
|
"'pdf': pdf visualization (requires --im_dirpath for the image), 'shp': shapefile") |
|
argparser.add_argument( |
|
'--bbox', |
|
nargs='*', |
|
type=int, |
|
help='Selects area in bbox for computation.') |
|
|
|
args = argparser.parse_args() |
|
return args |
|
|
|
|
|
def simplify(polygons, probs, tolerance): |
|
if type(tolerance) == list: |
|
out_polygons_dict = {} |
|
out_probs_dict = {} |
|
for tol in tolerance: |
|
out_polygons, out_probs = simplify(polygons, probs, tol) |
|
out_polygons_dict["tol_{}".format(tol)] = out_polygons |
|
out_probs_dict["tol_{}".format(tol)] = out_probs |
|
return out_polygons_dict, out_probs_dict |
|
else: |
|
out_polygons = [polygon.simplify(tolerance, preserve_topology=True) for polygon in polygons] |
|
return out_polygons, probs |
|
|
|
|
|
def shapely_postprocess(out_contours, np_indicator, config): |
|
height = np_indicator.shape[0] |
|
width = np_indicator.shape[1] |
|
|
|
|
|
line_string_list = [shapely.geometry.LineString(out_contour[:, ::-1]) for out_contour in out_contours] |
|
|
|
|
|
line_string_list.append( |
|
shapely.geometry.LinearRing([ |
|
(0, 0), |
|
(0, height - 1), |
|
(width - 1, height - 1), |
|
(width - 1, 0), |
|
])) |
|
|
|
|
|
multi_line_string = shapely.ops.unary_union(line_string_list) |
|
|
|
|
|
polygons, dangles, cuts, invalids = shapely.ops.polygonize_full(multi_line_string) |
|
|
|
polygons = list(polygons) |
|
|
|
|
|
polygons = [polygon for polygon in polygons if |
|
config["min_area"] < polygon.area] |
|
|
|
|
|
filtered_polygons = [] |
|
filtered_polygon_probs = [] |
|
for polygon in polygons: |
|
prob = polygonize_utils.compute_geom_prob(polygon, np_indicator) |
|
|
|
if config["seg_threshold"] < prob: |
|
filtered_polygons.append(polygon) |
|
filtered_polygon_probs.append(prob) |
|
|
|
polygons, probs = simplify(filtered_polygons, filtered_polygon_probs, config["tolerance"]) |
|
|
|
return polygons, probs |
|
|
|
|
|
def polygonize(seg_batch, config, pool=None, pre_computed=None): |
|
|
|
|
|
assert len(seg_batch.shape) == 4 and seg_batch.shape[ |
|
1] <= 3, "seg_batch should be (N, C, H, W) with C <= 3, not {}".format(seg_batch.shape) |
|
|
|
|
|
|
|
indicator_batch = seg_batch[:, 0, :, :] |
|
np_indicator_batch = indicator_batch.cpu().numpy() |
|
|
|
|
|
|
|
if pre_computed is None or "init_contours_batch" not in pre_computed: |
|
|
|
init_contours_batch = polygonize_utils.compute_init_contours_batch(np_indicator_batch, config["data_level"], pool=pool) |
|
|
|
|
|
else: |
|
init_contours_batch = pre_computed["init_contours_batch"] |
|
|
|
|
|
|
|
if pool is not None: |
|
shapely_postprocess_partial = partial(shapely_postprocess, config=config) |
|
polygons_probs_batch = pool.starmap(shapely_postprocess_partial, zip(init_contours_batch, np_indicator_batch)) |
|
polygons_batch, probs_batch = zip(*polygons_probs_batch) |
|
else: |
|
polygons_batch = [] |
|
probs_batch = [] |
|
for i, out_contours in enumerate(init_contours_batch): |
|
polygons, probs = shapely_postprocess(out_contours, np_indicator_batch[i], config) |
|
polygons_batch.append(polygons) |
|
probs_batch.append(probs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return polygons_batch, probs_batch |
|
|
|
|
|
def run_one(seg_filepath, out_dirpath, config, im_dirpath, out_ext=None, bbox=None): |
|
filename = os.path.basename(seg_filepath) |
|
name = os.path.splitext(filename)[0] |
|
|
|
|
|
image = None |
|
im_filepath = os.path.join(im_dirpath, name + ".tif") |
|
if out_ext == "pdf": |
|
image = skimage.io.imread(im_filepath) |
|
|
|
|
|
seg_img = Image.open(seg_filepath) |
|
seg = np.array(seg_img) |
|
if seg.dtype == np.uint8: |
|
seg = seg / 255 |
|
elif seg.dtype == np.bool: |
|
seg = seg.astype(np.float) |
|
|
|
|
|
if bbox is not None: |
|
assert len(bbox) == 4, "bbox should have 4 values" |
|
|
|
|
|
if image is not None: |
|
image = image[bbox[0]:bbox[2], bbox[1]:bbox[3]] |
|
seg = seg[bbox[0]:bbox[2], bbox[1]:bbox[3]] |
|
extra_name = ".bbox_{}_{}_{}_{}".format(*bbox) |
|
else: |
|
extra_name = "" |
|
|
|
|
|
if len(seg.shape) < 3: |
|
seg = seg[:, :, None] |
|
seg_batch = torch.tensor(np.transpose(seg[:, :, :2], (2, 0, 1)), dtype=torch.float)[None, ...] |
|
|
|
out_contours_batch, out_probs_batch = polygonize(seg_batch, config) |
|
|
|
polygons = out_contours_batch[0] |
|
|
|
if out_ext == "shp": |
|
out_filepath = os.path.join(out_dirpath, name + ".shp") |
|
geo_utils.save_shapefile_from_shapely_polygons(polygons, im_filepath, out_filepath) |
|
elif out_ext == "pdf": |
|
base_filepath = os.path.splitext(seg_filepath)[0] |
|
filepath = base_filepath + extra_name + ".poly_simple.pdf" |
|
|
|
plot_utils.save_poly_viz(image, polygons, filepath, markersize=30, linewidths=1, draw_vertices=True) |
|
else: |
|
raise ValueError(f"out_ext should be shp or pdf, not {out_ext}") |
|
|
|
|
|
def main(): |
|
config = { |
|
"data_level": 0.5, |
|
"tolerance": 1.0, |
|
"seg_threshold": 0.5, |
|
"min_area": 10 |
|
} |
|
|
|
args = get_args() |
|
|
|
pool = Pool() |
|
list(tqdm(pool.imap(partial(run_one, out_dirpath=args.out_dirpath, config=config, im_dirpath=args.im_dirpath, out_ext=args.out_ext, bbox=args.bbox), args.seg_filepath), desc="Simple poly.", total=len(args.seg_filepath))) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|