Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
import numpy as np | |
from PIL import Image | |
import rasterio | |
from rasterio.windows import Window | |
from tqdm.auto import tqdm | |
import io | |
import zipfile | |
import os | |
import albumentations as albu | |
import segmentation_models_pytorch as smp | |
from albumentations.pytorch.transforms import ToTensorV2 | |
import geopandas as gpd | |
from shapely.geometry import shape | |
from shapely.ops import unary_union | |
from rasterio.features import shapes | |
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
ENCODER = 'se_resnext50_32x4d' | |
ENCODER_WEIGHTS = 'imagenet' | |
# Load and prepare the model | |
def load_model(): | |
model = torch.load(r'C:\Users\MOUAD\Documents\PV panels\application\deeplabv3+ v15.pth', map_location=DEVICE) | |
model.eval().float() | |
return model | |
best_model = load_model() | |
def to_tensor(x, **kwargs): | |
return x.astype('float32') | |
# Preprocessing | |
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS) | |
def get_preprocessing(): | |
_transform = [ | |
albu.Resize(512, 512), | |
albu.Lambda(image=preprocessing_fn), | |
albu.Lambda(image=to_tensor, mask=to_tensor), | |
ToTensorV2(), | |
] | |
return albu.Compose(_transform) | |
preprocess = get_preprocessing() | |
def process_and_predict(image, model): | |
if isinstance(image, Image.Image): | |
image = np.array(image) | |
if image.ndim == 2: | |
image = np.stack([image] * 3, axis=-1) | |
elif image.shape[2] == 4: | |
image = image[:, :, :3] | |
preprocessed = preprocess(image=image)['image'] | |
input_tensor = preprocessed.unsqueeze(0).to(DEVICE) | |
mask = model(input_tensor) | |
mask = torch.sigmoid(mask) | |
mask = (mask > 0.6).float() | |
mask_image = Image.fromarray((mask.squeeze().cpu().numpy() * 255).astype(np.uint8)) | |
return mask_image | |
def extract_tiles(map_file, model, tile_size=512, overlap=0, batch_size=4): | |
tiles = [] | |
with rasterio.open(map_file) as src: | |
height = src.height | |
width = src.width | |
effective_tile_size = tile_size - overlap | |
for y in tqdm(range(0, height, effective_tile_size)): | |
for x in range(0, width, effective_tile_size): | |
batch_images = [] | |
batch_metas = [] | |
for i in range(batch_size): | |
curr_y = y + (i * effective_tile_size) | |
if curr_y >= height: | |
break | |
window = Window(x, curr_y, tile_size, tile_size) | |
out_image = src.read(window=window) | |
if out_image.shape[0] == 1: | |
out_image = np.repeat(out_image, 3, axis=0) | |
elif out_image.shape[0] != 3: | |
raise ValueError("The number of channels in the image is not supported") | |
out_image = np.transpose(out_image, (1, 2, 0)) | |
tile_image = Image.fromarray(out_image.astype(np.uint8)) | |
out_meta = src.meta.copy() | |
out_meta.update({ | |
"driver": "GTiff", | |
"height": tile_size, | |
"width": tile_size, | |
"transform": rasterio.windows.transform(window, src.transform) | |
}) | |
tile_image = np.array(tile_image) | |
preprocessed_tile = preprocess(image=tile_image)['image'] | |
batch_images.append(preprocessed_tile) | |
batch_metas.append(out_meta) | |
if not batch_images: | |
break | |
batch_tensor = torch.cat([img.unsqueeze(0).to(DEVICE) for img in batch_images], dim=0) | |
with torch.no_grad(): | |
batch_masks = model(batch_tensor.to(DEVICE)) | |
batch_masks = torch.sigmoid(batch_masks) | |
batch_masks = (batch_masks > 0.6).float() | |
for j, mask_tensor in enumerate(batch_masks): | |
mask_resized = torch.nn.functional.interpolate(mask_tensor.unsqueeze(0), | |
size=(tile_size, tile_size), mode='bilinear', | |
align_corners=False).squeeze(0) | |
mask_array = mask_resized.squeeze().cpu().numpy() | |
if mask_array.any() == 1: | |
tiles.append([mask_array, batch_metas[j]]) | |
return tiles | |
def create_vector_mask(tiles, output_path): | |
all_polygons = [] | |
for mask_array, meta in tiles: | |
# Ensure mask is binary | |
mask_array = (mask_array > 0).astype(np.uint8) | |
# Get shapes from the mask | |
mask_shapes = list(shapes(mask_array, mask=mask_array, transform=meta['transform'])) | |
# Convert shapes to Shapely polygons | |
polygons = [shape(geom) for geom, value in mask_shapes if value == 1] | |
all_polygons.extend(polygons) | |
# Perform union of all polygons | |
union_polygon = unary_union(all_polygons) | |
# Create a GeoDataFrame | |
gdf = gpd.GeoDataFrame({'geometry': [union_polygon]}, crs=meta['crs']) | |
# Save to file | |
gdf.to_file(output_path) | |
# Calculate area in square meters | |
area_m2 = gdf.to_crs(epsg=3857).area.sum() | |
# Convert to hectares | |
area_ha = area_m2 / 10000 | |
return gdf, area_ha | |
def main(): | |
st.title("TIF File Processor") | |
uploaded_file = st.file_uploader("Choose a TIF file", type="tif") | |
if uploaded_file is not None: | |
st.write("File uploaded successfully!") | |
if st.button("Process File"): | |
st.write("Processing...") | |
with open("temp.tif", "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
best_model.float() | |
tiles = extract_tiles("temp.tif", best_model, tile_size=512, overlap=15, batch_size=4) | |
st.write("Processing complete!") | |
output_path = "output_mask.shp" | |
result_gdf, area_ha = create_vector_mask(tiles, output_path) | |
st.write("Vector mask created successfully!") | |
st.write(f"Total area occupied by polygons: {area_ha:.2f} hectares") | |
# Offer the shapefile for download | |
shp_files = [f for f in os.listdir() if | |
f.startswith("output_mask") and f.endswith((".shp", ".shx", ".dbf", ".prj"))] | |
with io.BytesIO() as zip_buffer: | |
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: | |
for file in shp_files: | |
zip_file.write(file) | |
zip_buffer.seek(0) | |
st.download_button( | |
label="Download shapefile", | |
data=zip_buffer, | |
file_name="output_mask.zip", | |
mime="application/zip" | |
) | |
# Clean up temporary files | |
os.remove("temp.tif") | |
for file in shp_files: | |
os.remove(file) | |
if __name__ == "__main__": | |
main() |