import numpy as np |
import time |
import json |
import os.path |
from tqdm import tqdm |
import functools |
import rasterio |
from osgeo import gdal, ogr |
from osgeo import osr |
import overpy |
from pyproj import Proj, transform, Transformer |
import fiona |
import fiona.crs |
import shapely.geometry |
import shapely.ops |
from . import polygon_utils |
from . import math_utils |
from . import print_utils |
""" |
<osm-script timeout="900" element-limit="1073741824"> |
<union> |
<query type="way"> |
<has-kv k="{0}"/> |
<bbox-query s="{1}" w="{2}" n="{3}" e="{4}"/> |
</query> |
<recurse type="way-node" into="nodes"/> |
</union> |
<print/> |
</osm-script> |
""" |
WGS84_WKT = """ |
GEOGCS["GCS_WGS_1984", |
DATUM["WGS_1984", |
SPHEROID["WGS_84",6378137,298.257223563]], |
PRIMEM["Greenwich",0], |
UNIT["Degree",0.017453292519943295]] |
""" |
CRS = {'no_defs': True, 'ellps': 'WGS84', 'datum': 'WGS84', 'proj': 'longlat'} |
def get_coor_in_space(image_filepath): |
""" |
:param image_filepath: Path to geo-referenced tif image |
:return: coor in original space and in wsg84 spatial reference and original geotransform |
:return: geo transform (x_min, res, 0, y_max, 0, -res) |
:return: [[OR_x_min,OR_y_min,OR_x_max,OR_y_max],[TR_x_min,TR_y_min,TR_x_max,TR_y_max]] |
""" |
ds = gdal.Open(image_filepath) |
width = ds.RasterXSize |
height = ds.RasterYSize |
gt = ds.GetGeoTransform() |
x_min = gt[0] |
y_min = gt[3] + width * gt[4] + height * gt[5] |
x_max = gt[0] + width * gt[1] + height * gt[2] |
y_max = gt[3] |
prj = ds.GetProjection() |
srs = osr.SpatialReference(wkt=prj) |
coor_sys = srs.GetAttrValue("PROJCS|AUTHORITY", 1) |
if coor_sys is None: |
coor_sys = srs.GetAttrValue("GEOGCS|AUTHORITY", 1) |
new_cs = osr.SpatialReference() |
new_cs.ImportFromWkt(WGS84_WKT) |
transform = osr.CoordinateTransformation(srs, new_cs) |
lat_long_min = transform.TransformPoint(x_min, y_min) |
lat_long_max = transform.TransformPoint(x_max, y_max) |
coor = [[x_min, y_min, x_max, y_max], [lat_long_min[0], lat_long_min[1], lat_long_max[0], lat_long_max[1]]] |
return coor, gt, coor_sys |
def get_osm_data(coor_query): |
""" |
:param coor_query: [x_min, min_z, x_max, y_max] |
:return: OSM query result |
""" |
api = overpy.Overpass() |
query_buildings = QUERY_BASE.format("building", coor_query[1], coor_query[0], coor_query[3], coor_query[2]) |
query_successful = False |
wait_duration = 60 |
result = None |
while not query_successful: |
try: |
result = api.query(query_buildings) |
query_successful = True |
except overpy.exception.OverpassGatewayTimeout or overpy.exception.OverpassTooManyRequests or ConnectionResetError: |
print("OSM server overload. Waiting for {} seconds before querying again...".format(wait_duration)) |
time.sleep(wait_duration) |
wait_duration *= 2 |
return result |
def proj_to_epsg_space(nodes, coor_sys): |
original = Proj(CRS) |
destination = Proj(init='EPSG:{}'.format(coor_sys)) |
polygon = [] |
for node in nodes: |
polygon.append(transform(original, destination, node.lon, node.lat)) |
return np.array(polygon) |
def compute_epsg_to_image_mat(coor, gt): |
x_min = coor[0][0] |
y_max = coor[0][3] |
transform_mat = np.array([ |
[gt[1], 0, 0], |
[0, gt[5], 0], |
[x_min, y_max, 1], |
]) |
return np.linalg.inv(transform_mat) |
def compute_image_to_epsg_mat(coor, gt): |
x_min = coor[0][0] |
y_max = coor[0][3] |
transform_mat = np.array([ |
[gt[1], 0, 0], |
[0, gt[5], 0], |
[x_min, y_max, 1], |
]) |
return transform_mat |
def apply_transform_mat(polygon_epsg_space, transform_mat): |
polygon_epsg_space_homogeneous = math_utils.to_homogeneous(polygon_epsg_space) |
polygon_image_space_homogeneous = np.matmul(polygon_epsg_space_homogeneous, transform_mat) |
polygon_image_space = math_utils.to_euclidian(polygon_image_space_homogeneous) |
return polygon_image_space |
def get_polygons_from_osm(image_filepath, tag="", ij_coords=True): |
coor, gt, coor_system = get_coor_in_space(image_filepath) |
transform_mat = compute_epsg_to_image_mat(coor, gt) |
osm_data = get_osm_data(coor[1]) |
polygons = [] |
for way in osm_data.ways: |
if way.tags.get(tag, "n/a") != 'n/a': |
polygon = way.nodes |
polygon_epsg_space = proj_to_epsg_space(polygon, coor_system) |
polygon_image_space = apply_transform_mat(polygon_epsg_space, transform_mat) |
if ij_coords: |
polygon_image_space = polygon_utils.swap_coords(polygon_image_space) |
polygons.append(polygon_image_space) |
return polygons |
def get_polygons_from_shapefile(image_filepath, input_shapefile_filepath, progressbar=True): |
def process_one_polygon(polygon): |
assert len(polygon.shape) == 2, "polygon should have shape (n, d), not {}".format(polygon.shape) |
if 2 < polygon.shape[1]: |
print_utils.print_warning( |
"WARNING: polygon from shapefile has shape {}. Will discard extra values to have polygon with shape ({}, 2)".format( |
polygon.shape, polygon.shape[0])) |
polygon = polygon[:, :2] |
polygon_epsg_space = polygon |
polygon_image_space = apply_transform_mat(polygon_epsg_space, transform_mat) |
polygon_image_space = polygon_utils.swap_coords(polygon_image_space) |
polygons.append(polygon_image_space) |
if "properties" in parsed_json: |
properties = parsed_json["properties"] |
properties_list.append(properties) |
coor, gt, coor_system = get_coor_in_space(image_filepath) |
transform_mat = compute_epsg_to_image_mat(coor, gt) |
file = ogr.Open(input_shapefile_filepath) |
assert file is not None, "File {} does not exist!".format(input_shapefile_filepath) |
shape = file.GetLayer(0) |
feature_count = shape.GetFeatureCount() |
polygons = [] |
properties_list = [] |
if progressbar: |
iterator = tqdm(range(feature_count), desc="Reading features", leave=False) |
else: |
iterator = range(feature_count) |
for feature_index in iterator: |
feature = shape.GetFeature(feature_index) |
raw_json = feature.ExportToJson() |
parsed_json = json.loads(raw_json) |
geometry = parsed_json["geometry"] |
if geometry["type"] == "Polygon": |
polygon = np.array(geometry["coordinates"][0]) |
process_one_polygon(polygon) |
if geometry["type"] == "MultiPolygon": |
for individual_coordinates in geometry["coordinates"]: |
process_one_polygon(np.array(individual_coordinates[0])) |
if properties_list: |
return polygons, properties_list |
else: |
return polygons |
def create_ogr_polygon(polygon, transform_mat): |
polygon_swapped_coords = polygon_utils.swap_coords(polygon) |
polygon_epsg = apply_transform_mat(polygon_swapped_coords, transform_mat) |
ring = ogr.Geometry(ogr.wkbLinearRing) |
for coord in polygon_epsg: |
ring.AddPoint(coord[0], coord[1]) |
poly = ogr.Geometry(ogr.wkbPolygon) |
poly.AddGeometry(ring) |
return poly.ExportToWkt() |
def create_ogr_polygons(polygons, transform_mat): |
ogr_polygons = [] |
for polygon in polygons: |
ogr_polygons.append(create_ogr_polygon(polygon, transform_mat)) |
return ogr_polygons |
def save_image_as_geotiff(save_filepath, image, source_geotiff_filepath): |
source_ds = gdal.Open(source_geotiff_filepath) |
if source_ds is None: |
raise FileNotFoundError(f"Could not load source file {source_geotiff_filepath}") |
source_gt = source_ds.GetGeoTransform() |
source_prj = source_ds.GetProjection() |
driver = gdal.GetDriverByName("GTiff") |
outdata = driver.Create(save_filepath, image.shape[1], image.shape[0], image.shape[2]) |
outdata.SetGeoTransform(source_gt) |
outdata.SetProjection(source_prj) |
for i in range(image.shape[2]): |
outdata.GetRasterBand(i + 1).WriteArray(image[..., i]) |
outdata.FlushCache() |
outdata = None |
band = None |
ds = None |
def save_shapefile_from_polygons(polygons, image_filepath, output_shapefile_filepath, properties_list=None): |
""" |
https://gis.stackexchange.com/a/52708/8104 |
""" |
assert type(polygons) == list and type(polygons[0]) == np.ndarray and \ |
len(polygons[0].shape) == 2 and polygons[0].shape[1] == 2, \ |
"polygons should be a list of numpy arrays with shape (N, 2)" |
if properties_list is not None: |
assert len(polygons) == len(properties_list), "polygons and properties_list should have the same length" |
coor, gt, coor_system = get_coor_in_space(image_filepath) |
transform_mat = compute_image_to_epsg_mat(coor, gt) |
ogr_polygons = create_ogr_polygons(polygons, transform_mat) |
driver = ogr.GetDriverByName('Esri Shapefile') |
ds = driver.CreateDataSource(output_shapefile_filepath) |
srs = osr.SpatialReference() |
srs.ImportFromEPSG(4326) |
layer = ds.CreateLayer('', None, ogr.wkbPolygon) |
field_name_list = [] |
field_type_list = [] |
if properties_list is not None: |
for properties in properties_list: |
for (key, value) in properties.items(): |
if key not in field_name_list: |
field_name_list.append(key) |
field_type_list.append(type(value)) |
for (name, py_type) in zip(field_name_list, field_type_list): |
if py_type == int: |
ogr_type = ogr.OFTInteger |
elif py_type == float: |
print("is float") |
ogr_type = ogr.OFTReal |
elif py_type == str: |
ogr_type = ogr.OFTString |
else: |
ogr_type = ogr.OFTInteger |
layer.CreateField(ogr.FieldDefn(name, ogr_type)) |
defn = layer.GetLayerDefn() |
for index in range(len(ogr_polygons)): |
ogr_polygon = ogr_polygons[index] |
if properties_list is not None: |
properties = properties_list[index] |
else: |
properties = {} |
feat = ogr.Feature(defn) |
for (key, value) in properties.items(): |
feat.SetField(key, value) |
geom = ogr.CreateGeometryFromWkt(ogr_polygon) |
feat.SetGeometry(geom) |
layer.CreateFeature(feat) |
feat = geom = None |
ds = layer = feat = geom = None |
def save_shapefile_from_shapely_polygons(polygons, image_filepath, output_shapefile_filepath): |
schema = { |
'geometry': 'Polygon', |
'properties': {'id': 'int'}, |
} |
shp_crs = "EPSG:4326" |
shp_srs = Proj(shp_crs) |
raster = rasterio.open(image_filepath) |
raster_proj = lambda x, y: raster.transform * (x, y) |
os.makedirs(os.path.dirname(output_shapefile_filepath), exist_ok=True) |
with fiona.open(output_shapefile_filepath, 'w', driver='ESRI Shapefile', schema=schema, crs=fiona.crs.from_epsg(4326)) as c: |
for id, polygon in enumerate(polygons): |
raster_polygon = shapely.ops.transform(raster_proj, polygon) |
wkt_polygon = shapely.geometry.mapping(raster_polygon) |
c.write({ |
'geometry': wkt_polygon, |
'properties': {'id': id}, |
}) |
def indices_of_biggest_intersecting_polygon(polygon_list): |
""" |
Assumes polygons which intersect follow each other on the order given by polygon_list. |
This avoids the huge complexity of looking for an intersection between every polygon. |
:param ori_gt_polygons: |
:return: |
""" |
keep_index_list = [] |
current_cluster = [] |
for index, polygon in enumerate(polygon_list): |
current_cluster_polygons = [polygon_list[index] for index in current_cluster] |
is_intersection = polygon_utils.check_intersection_with_polygons(polygon, current_cluster_polygons) |
if is_intersection: |
current_cluster.append(index) |
else: |
cluster_max_index = 0 |
cluster_max_area = 0 |
for cluster_polygon_index in current_cluster: |
cluster_polygon = polygon_list[cluster_polygon_index] |
area = polygon_utils.polygon_area(cluster_polygon) |
if cluster_max_area < area: |
cluster_max_area = area |
cluster_max_index = cluster_polygon_index |
keep_index_list.append(cluster_max_index) |
current_cluster = [index] |
return keep_index_list |
def get_pixelsize(filepath): |
raster = gdal.Open(filepath) |
gt = raster.GetGeoTransform() |
pixelsize_x = gt[1] |
pixelsize_y = -gt[5] |
pixelsize = (pixelsize_x + pixelsize_y) / 2 |
return pixelsize |
def crop_shapefile(input_filepath, mask_filepath, output_filepath): |
shp_mask_filepath = os.path.join(os.path.dirname(input_filepath), "mask.shp") |
input_file = ogr.Open(input_filepath) |
assert input_file is not None, "File {} does not exist!".format(input_filepath) |
input_layer = input_file.GetLayer(0) |
mask_file = ogr.Open(shp_mask_filepath) |
assert mask_file is not None, "File {} does not exist!".format(shp_mask_filepath) |
mask_layer = mask_file.GetLayer(0) |
print(mask_layer.GetFeatureCount()) |
feature = mask_layer.GetFeature(0) |
raw_json = feature.ExportToJson() |
parsed_json = json.loads(raw_json) |
print(parsed_json) |
ogrGeometryType = ogr.Geometry(ogr.wkbPolygon) |
outDriver = ogr.GetDriverByName("ESRI Shapefile") |
outDs = outDriver.CreateDataSource(output_filepath) |
outLayer = outDs.CreateLayer('', None, ogr.wkbPolygon) |
input_layer.Intersection(mask_layer, outLayer, options=["SKIP_FAILURES=YES"]) |
def main(): |
main_dirpath = "/workspace/data/stereo_dataset/raw/leibnitz" |
image_filepath = os.path.join(main_dirpath, "leibnitz_ortho_ref_RGB.tif") |
input_shapefile_filepath = os.path.join(main_dirpath, "Leibnitz_buildings_ref.shp") |
output_shapefile_filepath = os.path.join(main_dirpath, "Leibnitz_buildings_ref.shifted.shp") |
polygons, properties_list = get_polygons_from_shapefile(image_filepath, input_shapefile_filepath) |
print(polygons[0]) |
print(properties_list[0]) |
shift = np.array([0, 0]) |
shifted_polygons = [polygon + shift for polygon in polygons] |
print(shifted_polygons[0]) |
save_shapefile_from_polygons(shifted_polygons, image_filepath, output_shapefile_filepath, properties_list=properties_list) |
if __name__ == "__main__": |
main() |