|
""" |
|
Geospatial utilities for image processing and GeoJSON generation. |
|
This module adapts techniques from the geoai library for better polygon generation |
|
with simplified dependencies. |
|
""" |
|
|
|
import os |
|
import logging |
|
import uuid |
|
import numpy as np |
|
import cv2 |
|
from PIL import Image, TiffTags, TiffImagePlugin |
|
import json |
|
import re |
|
from shapely.geometry import Polygon, MultiPolygon, mapping |
|
from shapely import ops |
|
|
|
def extract_contours(image_path, min_area=50, epsilon_factor=0.002): |
|
""" |
|
Extract contours from an image and convert them to polygons. |
|
Uses OpenCV's contour detection with douglas-peucker simplification. |
|
|
|
Args: |
|
image_path (str): Path to the processed image |
|
min_area (int): Minimum contour area to keep |
|
epsilon_factor (float): Simplification factor for douglas-peucker algorithm |
|
|
|
Returns: |
|
list: List of polygon objects |
|
""" |
|
try: |
|
|
|
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
|
if img is None: |
|
|
|
pil_img = Image.open(image_path).convert('L') |
|
img = np.array(pil_img) |
|
|
|
|
|
_, thresh = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
polygons = [] |
|
for contour in contours: |
|
|
|
area = cv2.contourArea(contour) |
|
if area < min_area: |
|
continue |
|
|
|
|
|
epsilon = epsilon_factor * cv2.arcLength(contour, True) |
|
approx = cv2.approxPolyDP(contour, epsilon, True) |
|
|
|
|
|
if len(approx) >= 3: |
|
polygon_points = [] |
|
for point in approx: |
|
x, y = point[0] |
|
polygon_points.append((float(x), float(y))) |
|
|
|
|
|
if polygon_points[0] != polygon_points[-1]: |
|
polygon_points.append(polygon_points[0]) |
|
|
|
|
|
polygon = Polygon(polygon_points) |
|
if polygon.is_valid: |
|
polygons.append(polygon) |
|
|
|
return polygons |
|
|
|
except Exception as e: |
|
logging.error(f"Error extracting contours: {str(e)}") |
|
return [] |
|
|
|
def simplify_polygons(polygons, tolerance=1.0): |
|
""" |
|
Apply polygon simplification to reduce the number of vertices. |
|
|
|
Args: |
|
polygons (list): List of shapely Polygon objects |
|
tolerance (float): Simplification tolerance |
|
|
|
Returns: |
|
list: List of simplified polygons |
|
""" |
|
simplified = [] |
|
for polygon in polygons: |
|
|
|
simp = polygon.simplify(tolerance, preserve_topology=True) |
|
if simp.is_valid and not simp.is_empty: |
|
simplified.append(simp) |
|
|
|
return simplified |
|
|
|
def regularize_polygons(polygons): |
|
""" |
|
Regularize polygons to make them more rectangular when appropriate. |
|
|
|
Args: |
|
polygons (list): List of shapely Polygon objects |
|
|
|
Returns: |
|
list: List of regularized polygons |
|
""" |
|
regularized = [] |
|
for polygon in polygons: |
|
try: |
|
|
|
bounds = polygon.bounds |
|
width = bounds[2] - bounds[0] |
|
height = bounds[3] - bounds[1] |
|
area_ratio = polygon.area / (width * height) |
|
|
|
|
|
if area_ratio > 0.8: |
|
|
|
minx, miny, maxx, maxy = polygon.bounds |
|
regularized.append(Polygon([ |
|
(minx, miny), (maxx, miny), |
|
(maxx, maxy), (minx, maxy), (minx, miny) |
|
])) |
|
else: |
|
regularized.append(polygon) |
|
except Exception as e: |
|
logging.warning(f"Error regularizing polygon: {str(e)}") |
|
regularized.append(polygon) |
|
|
|
return regularized |
|
|
|
def merge_nearby_polygons(polygons, distance_threshold=5.0): |
|
""" |
|
Merge polygons that are close to each other to reduce the polygon count. |
|
|
|
Args: |
|
polygons (list): List of shapely Polygon objects |
|
distance_threshold (float): Distance threshold for merging |
|
|
|
Returns: |
|
list: List of merged polygons |
|
""" |
|
if not polygons: |
|
return [] |
|
|
|
|
|
buffered = [polygon.buffer(distance_threshold) for polygon in polygons] |
|
|
|
|
|
union = ops.unary_union(buffered) |
|
|
|
|
|
if isinstance(union, Polygon): |
|
return [union] |
|
elif isinstance(union, MultiPolygon): |
|
return list(union.geoms) |
|
else: |
|
return [] |
|
|
|
def extract_geo_coordinates_from_image(image_path): |
|
""" |
|
Extract geographic coordinates from image metadata (EXIF, GeoTIFF). |
|
Uses rasterio for more reliable GeoTIFF handling. |
|
|
|
Args: |
|
image_path (str): Path to the image file |
|
|
|
Returns: |
|
tuple: (min_lat, min_lon, max_lat, max_lon) or None if not found |
|
""" |
|
try: |
|
|
|
if image_path.lower().endswith(('.tif', '.tiff')): |
|
try: |
|
import rasterio |
|
from rasterio.warp import transform_bounds |
|
|
|
logging.info(f"Using rasterio to extract coordinates from {image_path}") |
|
|
|
with rasterio.open(image_path) as src: |
|
|
|
if src.crs is not None: |
|
|
|
bounds = src.bounds |
|
|
|
|
|
if src.crs.to_epsg() != 4326: |
|
west, south, east, north = transform_bounds( |
|
src.crs, 'EPSG:4326', |
|
bounds.left, bounds.bottom, bounds.right, bounds.top |
|
) |
|
else: |
|
west, south, east, north = bounds |
|
|
|
logging.info(f"Extracted coordinates from GeoTIFF: {west},{south} to {east},{north}") |
|
return south, west, north, east |
|
except Exception as e: |
|
logging.warning(f"Rasterio extraction failed: {str(e)}, falling back to PIL") |
|
|
|
|
|
img = Image.open(image_path) |
|
|
|
|
|
if hasattr(img, 'tag') and img.tag: |
|
logging.info(f"Detected image with tags, checking for geospatial metadata") |
|
|
|
|
|
pixel_scale_tag = None |
|
tiepoint_tag = None |
|
|
|
|
|
tag_dict = img.tag.items() if hasattr(img.tag, 'items') else {} |
|
|
|
is_brazil_image = False |
|
|
|
if not tag_dict and is_brazil_image: |
|
logging.info(f"Special case for Brazil image detected in: {image_path}") |
|
|
|
|
|
|
|
min_lat = -22.96 |
|
min_lon = -43.38 |
|
max_lat = -22.94 |
|
max_lon = -43.36 |
|
logging.info(f"Using known Brazil coordinates: {min_lon},{min_lat} to {max_lon},{max_lat}") |
|
return min_lat, min_lon, max_lat, max_lon |
|
|
|
for tag_id, value in tag_dict: |
|
tag_name = TiffTags.TAGS.get(tag_id, str(tag_id)) |
|
logging.debug(f"TIFF tag: {tag_name} ({tag_id}): {value}") |
|
|
|
if tag_id == 33550: |
|
pixel_scale_tag = value |
|
elif tag_id == 33922: |
|
tiepoint_tag = value |
|
|
|
|
|
|
|
geotiff_indicators = ['ModelPixelScale', 'ModelTiepoint', 'GeoKey', 'GeoAscii'] |
|
has_geotiff_indicators = False |
|
|
|
for indicator in geotiff_indicators: |
|
if indicator in str(img.tag): |
|
has_geotiff_indicators = True |
|
logging.info(f"Found GeoTIFF indicator: {indicator}") |
|
break |
|
|
|
|
|
log_pattern = r"ModelPixelScaleTag.*?value: b'(.*?)'" |
|
log_matches = re.findall(log_pattern, str(img.tag)) |
|
|
|
|
|
if (log_matches or has_geotiff_indicators) and not pixel_scale_tag: |
|
logging.info(f"GeoTIFF indicators detected in image") |
|
|
|
|
|
|
|
try: |
|
|
|
if log_matches: |
|
logging.info(f"Found raw pixel scale data: {log_matches[0]}") |
|
|
|
except Exception as e: |
|
logging.error(f"Error parsing raw tag data: {str(e)}") |
|
|
|
if pixel_scale_tag and tiepoint_tag: |
|
|
|
x_scale = float(pixel_scale_tag[0]) |
|
y_scale = float(pixel_scale_tag[1]) |
|
|
|
|
|
i, j, k = float(tiepoint_tag[0]), float(tiepoint_tag[1]), float(tiepoint_tag[2]) |
|
x, y, z = float(tiepoint_tag[3]), float(tiepoint_tag[4]), float(tiepoint_tag[5]) |
|
|
|
|
|
width, height = img.size |
|
|
|
|
|
min_lon = x |
|
max_lat = y |
|
max_lon = x + width * x_scale |
|
min_lat = y - height * y_scale |
|
|
|
logging.info(f"Extracted geo bounds: {min_lon},{min_lat} to {max_lon},{max_lat}") |
|
return min_lat, min_lon, max_lat, max_lon |
|
|
|
logging.info("No valid geospatial metadata found in TIFF") |
|
|
|
|
|
elif hasattr(img, '_getexif') and img._getexif(): |
|
exif = img._getexif() |
|
if exif and 34853 in exif: |
|
gps_info = exif[34853] |
|
|
|
|
|
if 1 in gps_info and 2 in gps_info and 3 in gps_info and 4 in gps_info: |
|
|
|
lat_ref = gps_info[1] |
|
lat = gps_info[2] |
|
lat_val = lat[0][0]/lat[0][1] + lat[1][0]/(lat[1][1]*60) + lat[2][0]/(lat[2][1]*3600) |
|
if lat_ref == 'S': |
|
lat_val = -lat_val |
|
|
|
|
|
lon_ref = gps_info[3] |
|
lon = gps_info[4] |
|
lon_val = lon[0][0]/lon[0][1] + lon[1][0]/(lon[1][1]*60) + lon[2][0]/(lon[2][1]*3600) |
|
if lon_ref == 'W': |
|
lon_val = -lon_val |
|
|
|
|
|
delta = 0.01 |
|
min_lat = lat_val - delta |
|
min_lon = lon_val - delta |
|
max_lat = lat_val + delta |
|
max_lon = lon_val + delta |
|
|
|
logging.info(f"Extracted EXIF GPS bounds: {min_lon},{min_lat} to {max_lon},{max_lat}") |
|
return min_lat, min_lon, max_lat, max_lon |
|
|
|
logging.info("No valid GPS metadata found in EXIF") |
|
|
|
|
|
logging.warning("Could not extract geospatial coordinates from image") |
|
return None |
|
except Exception as e: |
|
logging.error(f"Error extracting geo coordinates: {str(e)}") |
|
return None |
|
|
|
def convert_to_geojson_with_transform(polygons, image_height, image_width, |
|
min_lat=None, min_lon=None, max_lat=None, max_lon=None): |
|
""" |
|
Convert polygons to GeoJSON with proper geographic transformation. |
|
|
|
Args: |
|
polygons (list): List of shapely Polygon objects |
|
image_height (int): Height of the source image |
|
image_width (int): Width of the source image |
|
min_lat (float, optional): Minimum latitude for geographic bounds |
|
min_lon (float, optional): Minimum longitude for geographic bounds |
|
max_lat (float, optional): Maximum latitude for geographic bounds |
|
max_lon (float, optional): Maximum longitude for geographic bounds |
|
|
|
Returns: |
|
dict: GeoJSON object |
|
""" |
|
|
|
if None in (min_lon, min_lat, max_lon, max_lat): |
|
logging.warning("No geographic coordinates provided for GeoJSON transformation. Using default values.") |
|
|
|
min_lon, min_lat = -98.0, 32.0 |
|
max_lon, max_lat = -96.0, 34.0 |
|
|
|
|
|
geojson = { |
|
"type": "FeatureCollection", |
|
"features": [] |
|
} |
|
|
|
|
|
def transform_point(x, y): |
|
|
|
lon = min_lon + (x / image_width) * (max_lon - min_lon) |
|
|
|
lat = max_lat - (y / image_height) * (max_lat - min_lat) |
|
return lon, lat |
|
|
|
|
|
for i, polygon in enumerate(polygons): |
|
|
|
coords = list(polygon.exterior.coords) |
|
|
|
|
|
geo_coords = [transform_point(x, y) for x, y in coords] |
|
|
|
|
|
geometry = { |
|
"type": "Polygon", |
|
"coordinates": [geo_coords] |
|
} |
|
|
|
|
|
feature = { |
|
"type": "Feature", |
|
"id": i + 1, |
|
"properties": { |
|
"name": f"Feature {i+1}" |
|
}, |
|
"geometry": geometry |
|
} |
|
|
|
geojson["features"].append(feature) |
|
|
|
return geojson |
|
|
|
def process_image_to_geojson(image_path, feature_type="buildings", original_file_path=None): |
|
""" |
|
Complete pipeline to convert an image to a simplified GeoJSON. |
|
|
|
Args: |
|
image_path (str): Path to the processed image |
|
feature_type (str): Type of features to extract ("buildings", "trees", "water", "roads") |
|
original_file_path (str, optional): Path to the original uploaded file |
|
|
|
Returns: |
|
dict: GeoJSON object |
|
""" |
|
try: |
|
|
|
img = Image.open(image_path) |
|
width, height = img.size |
|
|
|
|
|
from utils.segmentation import segment_and_extract_features |
|
|
|
|
|
_, polygons = segment_and_extract_features( |
|
image_path, |
|
output_mask_path=None, |
|
feature_type=feature_type, |
|
min_area=50, |
|
simplify_tolerance=2.0, |
|
merge_distance=5.0 |
|
) |
|
|
|
if not polygons: |
|
logging.warning("No polygons found in the image after segmentation") |
|
return {"type": "FeatureCollection", "features": []} |
|
|
|
|
|
original_image_path = original_file_path |
|
|
|
|
|
if not original_image_path and "_processed" in image_path: |
|
original_image_path = image_path.replace("_processed", "") |
|
|
|
if not os.path.exists(original_image_path): |
|
base_path = original_image_path.rsplit('.', 1)[0] |
|
for ext in ['.tif', '.tiff', '.jpg', '.jpeg', '.png']: |
|
if os.path.exists(base_path + ext): |
|
original_image_path = base_path + ext |
|
break |
|
|
|
logging.info(f"Using original image path: {original_image_path}") |
|
|
|
|
|
coords = None |
|
if original_image_path and os.path.exists(original_image_path): |
|
logging.info(f"Checking original image for geospatial data: {original_image_path}") |
|
coords = extract_geo_coordinates_from_image(original_image_path) |
|
|
|
if not coords: |
|
logging.info("Checking processed image for geospatial data") |
|
coords = extract_geo_coordinates_from_image(image_path) |
|
|
|
|
|
if coords: |
|
min_lat, min_lon, max_lat, max_lon = coords |
|
logging.info(f"Using extracted coordinates: {min_lon},{min_lat} to {max_lon},{max_lat}") |
|
else: |
|
|
|
if original_image_path and os.path.exists(original_image_path) and original_image_path.lower().endswith(('.tif', '.tiff')): |
|
try: |
|
import rasterio |
|
from rasterio.warp import transform_bounds |
|
|
|
with rasterio.open(original_image_path) as src: |
|
if src.crs is not None: |
|
bounds = src.bounds |
|
if src.crs.to_epsg() != 4326: |
|
west, south, east, north = transform_bounds( |
|
src.crs, 'EPSG:4326', |
|
bounds.left, bounds.bottom, bounds.right, bounds.top |
|
) |
|
else: |
|
west, south, east, north = bounds |
|
|
|
min_lat, min_lon, max_lat, max_lon = south, west, north, east |
|
logging.info(f"Using coordinates from rasterio: {min_lon},{min_lat} to {max_lon},{max_lat}") |
|
except Exception as e: |
|
logging.warning(f"Failed to extract coordinates with rasterio: {str(e)}") |
|
logging.warning("No coordinates found in image, using default location in Central US") |
|
min_lat, min_lon = 32.0, -98.0 |
|
max_lat, max_lon = 34.0, -96.0 |
|
else: |
|
logging.warning("No coordinates found in image, using default location in Central US") |
|
min_lat, min_lon = 32.0, -98.0 |
|
max_lat, max_lon = 34.0, -96.0 |
|
|
|
|
|
geojson = convert_to_geojson_with_transform( |
|
polygons, height, width, |
|
min_lat=min_lat, min_lon=min_lon, |
|
max_lat=max_lat, max_lon=max_lon |
|
) |
|
|
|
return geojson |
|
|
|
except Exception as e: |
|
logging.error(f"Error in GeoJSON processing: {str(e)}") |
|
return {"type": "FeatureCollection", "features": []} |