import itertools |
import os |
import numpy as np |
import pycocotools.mask |
import shapely.geometry |
import torch |
import skimage.morphology |
import skimage.measure |
from lydorn_utils import python_utils, geo_utils, math_utils |
import cv2 |
from frame_field_learning import plot_utils, local_utils |
import tifffile |
def get_save_filepath(base_filepath, name=None, ext=""): |
if type(base_filepath) is tuple: |
if name is not None: |
save_filepath = os.path.join(base_filepath[0], name, base_filepath[1] + ext) |
else: |
save_filepath = os.path.join(base_filepath[0], base_filepath[1] + ext) |
elif type(base_filepath) is str: |
if name is not None: |
save_filepath = base_filepath + "." + name + ext |
else: |
save_filepath = base_filepath + ext |
else: |
raise TypeError(f"base_filepath should be either of tuple or str, not {type(base_filepath)}") |
os.makedirs(os.path.dirname(save_filepath), exist_ok=True) |
return save_filepath |
def save_outputs(tile_data, config, eval_dirpath, split_name, flag_filepath_format): |
split_eval_dirpath = os.path.join(eval_dirpath, split_name) |
if not os.path.exists(split_eval_dirpath): |
os.makedirs(split_eval_dirpath) |
base_filepath = os.path.join(split_eval_dirpath, tile_data["name"]) |
if "image_relative_filepath" in tile_data: |
image_filepath = os.path.join(config["data_root_dir"], tile_data["image_relative_filepath"]) |
elif "image_filepath" in tile_data: |
image_filepath = tile_data["image_filepath"] |
else: |
raise ValueError("Could not get image_filepath from tile_data") |
if config["eval_params"]["save_individual_outputs"]["image"]: |
src_filepath = "/data" + image_filepath |
filepath = base_filepath + ".image" |
if os.path.islink(filepath): |
os.remove(filepath) |
os.symlink(src_filepath, filepath) |
if config["eval_params"]["save_individual_outputs"]["seg_gt"]: |
save_seg(tile_data["gt_polygons_image"], base_filepath, "seg.gt", image_filepath) |
if config["eval_params"]["save_individual_outputs"]["seg"]: |
save_seg(tile_data["seg"], base_filepath, "seg", image_filepath) |
if config["eval_params"]["save_individual_outputs"]["seg_mask"]: |
save_seg_mask(tile_data["seg_mask"], (split_eval_dirpath, tile_data["name"]), "seg_mask", image_filepath) |
if config["eval_params"]["save_individual_outputs"]["seg_opencities_mask"]: |
save_opencities_mask(tile_data["seg_mask"], base_filepath, "drivendata", |
image_filepath) |
if config["eval_params"]["save_individual_outputs"]["seg_luxcarta"]: |
save_seg_luxcarta_format(tile_data["seg"], base_filepath, "seg_luxcarta_format", |
image_filepath) |
if config["eval_params"]["save_individual_outputs"]["crossfield"]: |
save_crossfield(tile_data["crossfield"], base_filepath, "crossfield") |
if config["eval_params"]["save_individual_outputs"]["uv_angles"]: |
save_uv_angles(tile_data["crossfield"], base_filepath, "uv_angles", |
image_filepath) |
if config["eval_params"]["save_individual_outputs"]["poly_shapefile"]: |
save_shapefile(tile_data["polygons"], base_filepath, "poly_shapefile", image_filepath) |
if config["eval_params"]["save_individual_outputs"]["poly_geojson"]: |
save_geojson(tile_data["polygons"], base_filepath, "poly_geojson", image_filepath) |
if "poly_viz" in config["eval_params"]["save_individual_outputs"] and config["eval_params"]["save_individual_outputs"]["poly_viz"]: |
save_poly_viz(tile_data["image"], tile_data["polygons"], tile_data["polygon_probs"], base_filepath, "poly_viz") |
if "raw_pred" in config["eval_params"]["save_individual_outputs"] and config["eval_params"]["save_individual_outputs"]["raw_pred"]: |
save_raw_pred(tile_data, base_filepath, "raw_pred") |
def save_seg(seg, base_filepath, name, image_filepath): |
seg = np.transpose(seg.numpy(), (1, 2, 0)) |
seg_display = plot_utils.get_seg_display(seg) |
if seg_display.dtype != np.uint8: |
seg_display = (255 * seg_display).astype(np.uint8) |
seg_display_filepath = get_save_filepath(base_filepath, name, ".tif") |
geo_utils.save_image_as_geotiff(seg_display_filepath, seg_display, image_filepath) |
return seg_display_filepath |
def save_seg_mask(seg_mask, base_filepath, name, image_filepath): |
seg_mask = seg_mask.numpy() |
out = (255 * seg_mask).astype(np.uint8)[:, :, None] |
out_filepath = get_save_filepath(base_filepath, name, ".tif") |
geo_utils.save_image_as_geotiff(out_filepath, out, image_filepath) |
return out_filepath |
def save_seg_luxcarta_format(seg, base_filepath, name, image_filepath): |
seg = np.transpose(seg.numpy(), (1, 2, 0)) |
seg_luxcarta = np.zeros((seg.shape[0], seg.shape[1], 1), dtype=np.uint8) |
seg_interior = 0.5 < seg[..., 0] |
seg_luxcarta[seg_interior] = 1 |
if 2 <= seg.shape[2]: |
seg_edge = 0.5 < seg[..., 1] |
seg_luxcarta[seg_edge] = 2 |
seg_luxcarta_filepath = get_save_filepath(base_filepath, name, ".tif") |
geo_utils.save_image_as_geotiff(seg_luxcarta_filepath, seg_luxcarta, image_filepath) |
def save_poly_viz(image, polygons, polygon_probs, base_filepath, name): |
if type(image) == torch.Tensor: |
image = np.transpose(image.numpy(), (1, 2, 0)) |
if type(polygons) == dict: |
for key in polygons.keys(): |
save_poly_viz(image, polygons[key], polygon_probs[key], base_filepath, name + "." + key) |
elif type(polygons) == list: |
filepath = get_save_filepath(base_filepath, name, ".pdf") |
plot_utils.save_poly_viz(image, polygons, filepath, polygon_probs=polygon_probs) |
else: |
raise TypeError("polygons has unrecognized type {}".format(type(polygons))) |
def save_shapefile(polygons, base_filepath, name, image_filepath): |
if type(polygons) == dict: |
for key, item in polygons.items(): |
save_shapefile(item, base_filepath, name + "." + key, image_filepath) |
elif type(polygons) == list: |
filepath = get_save_filepath(base_filepath, name, ".shp") |
if type(polygons[0]) == np.array: |
geo_utils.save_shapefile_from_polygons(polygons, image_filepath, filepath) |
elif type(polygons[0]) == shapely.geometry.polygon.Polygon: |
geo_utils.save_shapefile_from_shapely_polygons(polygons, image_filepath, filepath) |
else: |
raise TypeError("polygons has unrecognized type {}".format(type(polygons))) |
def save_geojson(polygons, base_filepath, name=None, image_filepath=None): |
filepath = get_save_filepath(base_filepath, name, ".geojson") |
polygons_geometry_collection = shapely.geometry.collection.GeometryCollection(polygons) |
geojson = shapely.geometry.mapping(polygons_geometry_collection) |
python_utils.save_json(filepath, geojson) |
def poly_coco(polygons: list, polygon_probs: list, image_id: int): |
if type(polygons) == dict: |
annotations_dict = {} |
for key in polygons.keys(): |
_polygons = polygons[key] |
_polygon_probs = polygon_probs[key] |
annotations_dict[key] = poly_coco(_polygons, _polygon_probs, image_id) |
return annotations_dict |
elif type(polygons) == list: |
annotations = [] |
for polygon, prob in zip(polygons, polygon_probs): |
bbox = np.round([polygon.bounds[0], polygon.bounds[1], |
polygon.bounds[2] - polygon.bounds[0], polygon.bounds[3] - polygon.bounds[1]], 2) |
exterior = list(np.round(np.array(polygon.exterior.coords).reshape(-1), 2)) |
segmentation = [exterior] |
score = prob |
annotation = { |
"category_id": 100, |
"bbox": list(bbox), |
"segmentation": segmentation, |
"score": score, |
"image_id": image_id} |
annotations.append(annotation) |
return annotations |
else: |
raise TypeError("polygons has unrecognized type {}".format(type(polygons))) |
def save_poly_coco(annotations: list, base_filepath: str): |
""" |
@param annotations: Either [[annotation1 of im1, annotation2 of im1, ...], ...] or [{"method1": [annotation1 of im1, ...]}, ...] |
@param base_filepath: |
@return: |
""" |
if type(annotations[0]) == dict: |
dictionary = local_utils.list_of_dicts_to_dict_of_lists(annotations) |
dictionary = local_utils.flatten_dict(dictionary) |
for key, _annotations in dictionary.items(): |
out_filepath = base_filepath + "." + key + ".json" |
python_utils.save_json(out_filepath, _annotations) |
elif type(annotations[0]) == list: |
flattened_annotations = list(itertools.chain(*annotations)) |
out_filepath = get_save_filepath(base_filepath, None, ".json") |
python_utils.save_json(out_filepath, flattened_annotations) |
else: |
raise TypeError("annotations has unrecognized type {}".format(type(annotations))) |
def seg_coco(sample): |
annotations = [] |
seg_interior = sample["seg"][0, :, :].numpy() |
seg_mask = sample["seg_mask"].numpy() |
labels = skimage.morphology.label(seg_mask) |
properties = skimage.measure.regionprops(labels, cache=True) |
for i, contour_props in enumerate(properties): |
skimage_bbox = contour_props["bbox"] |
obj_mask = seg_mask[skimage_bbox[0]:skimage_bbox[2], skimage_bbox[1]:skimage_bbox[3]] |
obj_seg_interior = seg_interior[skimage_bbox[0]:skimage_bbox[2], skimage_bbox[1]:skimage_bbox[3]] |
score = float(np.mean(obj_seg_interior * obj_mask)) |
coco_bbox = [skimage_bbox[1], skimage_bbox[0], |
skimage_bbox[3] - skimage_bbox[1], skimage_bbox[2] - skimage_bbox[0]] |
image_mask = labels == (i + 1) |
rle = pycocotools.mask.encode(np.asfortranarray(image_mask)) |
rle["counts"] = rle["counts"].decode("utf-8") |
image_id = sample["image_id"].item() |
annotation = { |
"category_id": 100, |
"bbox": coco_bbox, |
"segmentation": rle, |
"score": score, |
"image_id": image_id} |
annotations.append(annotation) |
return annotations |
def save_seg_coco(sample, base_filepath, name): |
filepath = get_save_filepath(base_filepath, name, ".json") |
annotations = seg_coco(sample) |
python_utils.save_json(filepath, annotations) |
def save_crossfield(crossfield, base_filepath, name): |
crossfield = np.transpose(crossfield.numpy(), (1, 2, 0)) |
crossfield_filepath =get_save_filepath(base_filepath, name, ".npy") |
np.save(crossfield_filepath, crossfield) |
def save_uv_angles(crossfield, base_filepath, name, image_filepath): |
crossfield = np.transpose(crossfield.numpy(), (1, 2, 0)) |
u, v = math_utils.compute_crossfield_uv(crossfield) |
u_angle, v_angle = np.angle(u), np.angle(v) |
u_angle, v_angle = np.mod(u_angle, np.pi), np.mod(v_angle, np.pi) |
uv_angles = np.stack([u_angle, v_angle], axis=-1) |
uv_angles_as_image = np.round(uv_angles * 255 / np.pi).astype(np.uint8) |
save_filepath = get_save_filepath(base_filepath, name, ".tif") |
geo_utils.save_image_as_geotiff(save_filepath, uv_angles_as_image, image_filepath) |
def save_raw_pred(sample, base_filepath, name): |
save_filepath =get_save_filepath(base_filepath, name, ".pt") |
torch.save(sample, save_filepath) |
def save_opencities_mask(seg_mask, base_filepath, name, image_filepath): |
seg_mask = seg_mask.numpy() |
out_filepath = get_save_filepath(base_filepath, name, ".tif") |
tifffile.imwrite(out_filepath, np.logical_not((np.array(seg_mask).astype(np.bool)))) |