|
""" |
|
Segmentation utilities for image processing inspired by CLIPSeg techniques. |
|
This is a simplified version that does not require the full transformers library. |
|
""" |
|
|
|
import os |
|
import logging |
|
import numpy as np |
|
import cv2 |
|
from PIL import Image |
|
from utils.geospatial import extract_contours, simplify_polygons, regularize_polygons, merge_nearby_polygons |
|
|
|
def segment_by_color_threshold(image_path, output_path=None, |
|
threshold=127, color_channel=1, |
|
smoothing_sigma=1.0): |
|
""" |
|
Segment an image based on color thresholding. |
|
This is a simple segmentation inspired by more complex models like CLIPSeg. |
|
|
|
Args: |
|
image_path (str): Path to the input image |
|
output_path (str, optional): Path to save the segmentation mask |
|
threshold (int): Pixel intensity threshold (0-255) |
|
color_channel (int): Color channel to use for thresholding (0=R, 1=G, 2=B) |
|
smoothing_sigma (float): Gaussian smoothing sigma |
|
|
|
Returns: |
|
numpy.ndarray: Segmentation mask |
|
""" |
|
try: |
|
|
|
img = cv2.imread(image_path) |
|
if img is None: |
|
|
|
pil_img = Image.open(image_path).convert('RGB') |
|
img = np.array(pil_img) |
|
img = img[:, :, ::-1] |
|
|
|
|
|
b, g, r = cv2.split(img) |
|
channels = [r, g, b] |
|
|
|
if 0 <= color_channel < 3: |
|
channel = channels[color_channel] |
|
else: |
|
|
|
channel = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if smoothing_sigma > 0: |
|
channel = cv2.GaussianBlur(channel, (0, 0), smoothing_sigma) |
|
|
|
|
|
_, mask = cv2.threshold(channel, threshold, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
if output_path: |
|
cv2.imwrite(output_path, mask) |
|
logging.info(f"Saved segmentation mask to {output_path}") |
|
|
|
return mask |
|
|
|
except Exception as e: |
|
logging.error(f"Error in segmentation: {str(e)}") |
|
return None |
|
|
|
def segment_by_adaptive_threshold(image_path, output_path=None, |
|
block_size=11, c=2, |
|
smoothing_sigma=1.0): |
|
""" |
|
Segment an image using adaptive thresholding for better handling of |
|
lighting variations. |
|
|
|
Args: |
|
image_path (str): Path to the input image |
|
output_path (str, optional): Path to save the segmentation mask |
|
block_size (int): Size of the pixel neighborhood for threshold calculation |
|
c (int): Constant subtracted from the mean |
|
smoothing_sigma (float): Gaussian smoothing sigma |
|
|
|
Returns: |
|
numpy.ndarray: Segmentation mask |
|
""" |
|
try: |
|
|
|
img = cv2.imread(image_path) |
|
if img is None: |
|
|
|
pil_img = Image.open(image_path).convert('RGB') |
|
img = np.array(pil_img) |
|
img = img[:, :, ::-1] |
|
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if smoothing_sigma > 0: |
|
gray = cv2.GaussianBlur(gray, (0, 0), smoothing_sigma) |
|
|
|
|
|
mask = cv2.adaptiveThreshold( |
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
cv2.THRESH_BINARY, block_size, c |
|
) |
|
|
|
|
|
if output_path: |
|
cv2.imwrite(output_path, mask) |
|
logging.info(f"Saved segmentation mask to {output_path}") |
|
|
|
return mask |
|
|
|
except Exception as e: |
|
logging.error(f"Error in segmentation: {str(e)}") |
|
return None |
|
|
|
def segment_by_otsu(image_path, output_path=None, smoothing_sigma=1.0): |
|
""" |
|
Segment an image using Otsu's automatic thresholding method. |
|
|
|
Args: |
|
image_path (str): Path to the input image |
|
output_path (str, optional): Path to save the segmentation mask |
|
smoothing_sigma (float): Gaussian smoothing sigma |
|
|
|
Returns: |
|
numpy.ndarray: Segmentation mask |
|
""" |
|
try: |
|
|
|
img = cv2.imread(image_path) |
|
if img is None: |
|
|
|
pil_img = Image.open(image_path).convert('RGB') |
|
img = np.array(pil_img) |
|
img = img[:, :, ::-1] |
|
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if smoothing_sigma > 0: |
|
gray = cv2.GaussianBlur(gray, (0, 0), smoothing_sigma) |
|
|
|
|
|
_, mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
if output_path: |
|
cv2.imwrite(output_path, mask) |
|
logging.info(f"Saved segmentation mask to {output_path}") |
|
|
|
return mask |
|
|
|
except Exception as e: |
|
logging.error(f"Error in segmentation: {str(e)}") |
|
return None |
|
|
|
def segment_and_extract_features(image_path, output_mask_path=None, |
|
feature_type="buildings", |
|
min_area=50, simplify_tolerance=2.0, |
|
merge_distance=5.0): |
|
""" |
|
Complete pipeline for segmentation and feature extraction. |
|
|
|
Args: |
|
image_path (str): Path to the input image |
|
output_mask_path (str, optional): Path to save the segmentation mask |
|
feature_type (str): Type of features to extract ("buildings", "trees", "water", "roads") |
|
min_area (int): Minimum feature area to keep |
|
simplify_tolerance (float): Tolerance for polygon simplification |
|
merge_distance (float): Distance for merging nearby polygons |
|
|
|
Returns: |
|
tuple: (mask, polygons) - Segmentation mask and list of simplified Shapely polygons |
|
""" |
|
|
|
if feature_type.lower() == "buildings": |
|
|
|
mask = segment_by_adaptive_threshold( |
|
image_path, output_mask_path, |
|
block_size=15, c=2, smoothing_sigma=1.0 |
|
) |
|
elif feature_type.lower() == "trees" or feature_type.lower() == "vegetation": |
|
|
|
mask = segment_by_color_threshold( |
|
image_path, output_mask_path, |
|
threshold=140, color_channel=1, smoothing_sigma=1.5 |
|
) |
|
elif feature_type.lower() == "water": |
|
|
|
mask = segment_by_color_threshold( |
|
image_path, output_mask_path, |
|
threshold=120, color_channel=0, smoothing_sigma=2.0 |
|
) |
|
else: |
|
|
|
mask = segment_by_otsu( |
|
image_path, output_mask_path, smoothing_sigma=1.0 |
|
) |
|
|
|
if mask is None: |
|
logging.error("Segmentation failed") |
|
return None, [] |
|
|
|
|
|
temp_mask_path = None |
|
if not output_mask_path: |
|
temp_mask_path = os.path.join( |
|
os.path.dirname(image_path), |
|
f"{os.path.splitext(os.path.basename(image_path))[0]}_mask.png" |
|
) |
|
cv2.imwrite(temp_mask_path, mask) |
|
mask_path = temp_mask_path |
|
else: |
|
mask_path = output_mask_path |
|
|
|
|
|
polygons = extract_contours(mask_path, min_area=min_area) |
|
logging.info(f"Extracted {len(polygons)} initial polygons") |
|
|
|
|
|
if temp_mask_path and os.path.exists(temp_mask_path): |
|
os.remove(temp_mask_path) |
|
|
|
|
|
polygons = simplify_polygons(polygons, tolerance=simplify_tolerance) |
|
|
|
|
|
if feature_type.lower() == "buildings": |
|
polygons = regularize_polygons(polygons) |
|
|
|
|
|
polygons = merge_nearby_polygons(polygons, distance_threshold=merge_distance) |
|
logging.info(f"After processing: {len(polygons)} polygons") |
|
|
|
return mask, polygons |