|
|
|
import matplotlib.pyplot as plt |
|
|
|
from evaluation.viz import plot_example_single |
|
from dataset.torch import unbatch_to_device |
|
import matplotlib.pyplot as plt |
|
from typing import Optional, Tuple |
|
import cv2 |
|
import torch |
|
import numpy as np |
|
import time |
|
from logger import logger |
|
from evaluation.run import resolve_checkpoint_path, pretrained_models |
|
from models.maplocnet import MapLocNet |
|
from models.voting import fuse_gps, argmax_xyr |
|
|
|
from osm.raster import Canvas |
|
from utils.wrappers import Camera |
|
from utils.io import read_image |
|
from utils.geo import BoundaryBox, Projection |
|
from utils.exif import EXIF |
|
import requests |
|
from pathlib import Path |
|
from utils.exif import EXIF |
|
from dataset.image import resize_image, pad_image, rectify_image |
|
|
|
from dataset import UavMapDatasetModule |
|
import torchvision.transforms as tvf |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from sklearn.decomposition import PCA |
|
from PIL import Image |
|
|
|
|
|
from osm.tiling import TileManager |
|
from utils.viz_localization import ( |
|
likelihood_overlay, |
|
plot_dense_rotations, |
|
add_circle_inset, |
|
) |
|
|
|
from osm.viz import Colormap, plot_nodes |
|
from utils.viz_2d import plot_images |
|
|
|
from utils.viz_2d import features_to_RGB |
|
import random |
|
from geopy.distance import geodesic |
|
|
|
|
|
def vis_image_feature(F): |
|
def normalize(x): |
|
return x / np.linalg.norm(x, axis=-1, keepdims=True) |
|
|
|
|
|
F = F[:, 0:180, 0:180] |
|
flatten = [] |
|
c, h, w = F.shape |
|
print(F.shape) |
|
F = np.rollaxis(F, 0, 3) |
|
F_flat = F.reshape(-1, c) |
|
flatten.append(F_flat) |
|
flatten = normalize(flatten)[0] |
|
|
|
flatten = np.nan_to_num(flatten, nan=0) |
|
pca = PCA(n_components=3) |
|
|
|
print(flatten.shape) |
|
flatten = pca.fit_transform(flatten) |
|
flatten = (normalize(flatten) + 1) / 2 |
|
|
|
|
|
F_rgb, flatten = np.split(flatten, [h * w], axis=0) |
|
F_rgb = F_rgb.reshape((h, w, 3)) |
|
return F_rgb |
|
def distance(lat1, lon1, lat2, lon2): |
|
point1 = (lat1, lon1) |
|
point2 = (lat2, lon2) |
|
distance_km = geodesic(point1, point2).meters |
|
return distance_km |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def show_result(map_vis_image, pre_uv, pre_yaw): |
|
|
|
gray_mask = np.zeros_like(map_vis_image) |
|
gray_mask.fill(128) |
|
|
|
|
|
image = cv2.addWeighted(map_vis_image, 1, gray_mask, 0, 0) |
|
|
|
|
|
|
|
u, v = pre_uv |
|
x1, y1 = int(u), int(v) |
|
angle = pre_yaw - 90 |
|
|
|
length = 20 |
|
x2 = int(x1 + length * np.cos(np.radians(angle))) |
|
y2 = int(y1 + length * np.sin(np.radians(angle))) |
|
|
|
cv2.arrowedLine(image, (x1, y1), (x2, y2), (0, 0, 0), 2, 5, 0, 0.3) |
|
|
|
return image |
|
|
|
|
|
def xyz_to_latlon(x, y, z): |
|
|
|
wgs84 = pyproj.CRS('EPSG:4326') |
|
|
|
|
|
xyz = pyproj.CRS(f'+proj=geocent +datum=WGS84 +units=m +no_defs') |
|
|
|
|
|
transformer = pyproj.Transformer.from_crs(xyz, wgs84) |
|
|
|
|
|
lon, lat, _ = transformer.transform(x, y, z) |
|
|
|
return lat, lon |
|
|
|
|
|
class Demo: |
|
def __init__( |
|
self, |
|
experiment_or_path: Optional[str] = "OrienterNet_MGL", |
|
device=None, |
|
**kwargs |
|
): |
|
if experiment_or_path in pretrained_models: |
|
experiment_or_path, _ = pretrained_models[experiment_or_path] |
|
path = resolve_checkpoint_path(experiment_or_path) |
|
ckpt = torch.load(path, map_location=(lambda storage, loc: storage)) |
|
config = ckpt["hyper_parameters"] |
|
config.model.update(kwargs) |
|
config.model.image_encoder.backbone.pretrained = False |
|
|
|
model = MapLocNet(config.model).eval() |
|
state = {k[len("model."):]: v for k, v in ckpt["state_dict"].items()} |
|
model.load_state_dict(state, strict=True) |
|
if device is None: |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
model = model.to(device) |
|
|
|
self.model = model |
|
self.config = config |
|
self.device = device |
|
|
|
def prepare_data( |
|
self, |
|
image: np.ndarray, |
|
camera: Camera, |
|
canvas: Canvas, |
|
roll_pitch: Optional[Tuple[float]] = None, |
|
): |
|
|
|
image = torch.from_numpy(image).permute(2, 0, 1).float().div_(255) |
|
|
|
return { |
|
'map': torch.from_numpy(canvas.raster).long(), |
|
'image': image, |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def localize(self, image: np.ndarray, camera: Camera, canvas: Canvas, **kwargs): |
|
|
|
data = self.prepare_data(image, camera, canvas, **kwargs) |
|
data_ = {k: v.to(self.device)[None] for k, v in data.items()} |
|
|
|
|
|
|
|
start = time.time() |
|
with torch.no_grad(): |
|
pred = self.model(data_) |
|
|
|
end = time.time() |
|
xy_gps = canvas.bbox.center |
|
uv_gps = torch.from_numpy(canvas.to_uv(xy_gps)) |
|
|
|
lp_xyr = pred["log_probs"].squeeze(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
xyr = argmax_xyr(lp_xyr).cpu() |
|
|
|
prob = lp_xyr.exp().cpu() |
|
neural_map = pred["map"]["map_features"][0].squeeze(0).cpu() |
|
print('total time:', start - end) |
|
return xyr[:2], xyr[2], prob, neural_map, data["image"], data_, pred |
|
|
|
|
|
def load_test_data( |
|
root: Path, |
|
city: str, |
|
index: int, |
|
): |
|
uav_image_path = root / city / 'uav' |
|
map_path = root / city / 'map' |
|
map_vis = root / city / 'map_vis' |
|
info_path = root / city / 'info.csv' |
|
osm_path = root / city / '{}.osm'.format(city) |
|
|
|
info = np.loadtxt(str(info_path), dtype=str, delimiter=",", skiprows=1) |
|
|
|
id, uav_name, map_name, \ |
|
uav_long, uav_lat, \ |
|
map_long, map_lat, \ |
|
tile_size_meters, pixel_per_meter, \ |
|
u, v, yaw, dis = info[index] |
|
print(info[index]) |
|
uav_image_rgb = cv2.imread(str(uav_image_path / uav_name)) |
|
uav_image_rgb = cv2.cvtColor(uav_image_rgb, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
map_vis_image = cv2.imread(str(map_vis / uav_name)) |
|
map_vis_image = cv2.cvtColor(map_vis_image, cv2.COLOR_BGR2RGB) |
|
|
|
map = np.load(str(map_path / map_name)) |
|
|
|
tfs = [] |
|
tfs.append(tvf.ToTensor()) |
|
tfs.append(tvf.Resize(256)) |
|
val_tfs = tvf.Compose(tfs) |
|
|
|
uav_image = val_tfs(uav_image_rgb) |
|
|
|
|
|
|
|
|
|
|
|
uav_path = str(uav_image_path / uav_name) |
|
return { |
|
'map': torch.from_numpy(np.ascontiguousarray(map)).long().unsqueeze(0), |
|
'image': torch.tensor(uav_image).unsqueeze(0), |
|
'roll_pitch_yaw': torch.tensor((0, 0, float(yaw))).float().unsqueeze(0), |
|
'pixels_per_meter': torch.tensor(float(pixel_per_meter)).float().unsqueeze(0), |
|
"uv": torch.tensor([float(u), float(v)]).float().unsqueeze(0), |
|
}, uav_image_rgb, map_vis_image, uav_path, [float(map_lat), float(map_long)] |
|
|
|
|
|
def crop_image(image, width, height): |
|
|
|
x = int((image.shape[1] - width) / 2) |
|
y = int((image.shape[0] - height) / 2) |
|
|
|
|
|
cropped_image = image[y:y + height, x:x + width] |
|
return cropped_image |
|
|
|
|
|
def crop_square(image): |
|
|
|
height, width = image.shape[:2] |
|
|
|
|
|
min_length = min(height, width) |
|
|
|
|
|
top = (height - min_length) // 2 |
|
bottom = top + min_length |
|
left = (width - min_length) // 2 |
|
right = left + min_length |
|
|
|
|
|
cropped_image = image[top:bottom, left:right] |
|
|
|
return cropped_image |
|
def read_input_image_test( |
|
image, |
|
prior_latlon, |
|
tile_size_meters, |
|
): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image = cv2.resize(image,(256,256)) |
|
roll_pitch = None |
|
|
|
|
|
latlon = None |
|
if prior_latlon is not None: |
|
latlon = prior_latlon |
|
logger.info("Using prior latlon %s.", prior_latlon) |
|
|
|
if latlon is None: |
|
with open(image_path, "rb") as fid: |
|
exif = EXIF(fid, lambda: image.shape[:2]) |
|
geo = exif.extract_geo() |
|
if geo: |
|
alt = geo.get("altitude", 0) |
|
latlon = (geo["latitude"], geo["longitude"], alt) |
|
logger.info("Using prior location from EXIF.") |
|
|
|
else: |
|
logger.info("Could not find any prior location in the image EXIF metadata.") |
|
|
|
latlon = np.array(latlon) |
|
|
|
proj = Projection(*latlon) |
|
center = proj.project(latlon) |
|
bbox = BoundaryBox(center, center) + float(tile_size_meters) |
|
camera=None |
|
image=cv2.resize(image,(256,256)) |
|
return image, camera, roll_pitch, proj, bbox, latlon |
|
if __name__ == '__main__': |
|
experiment_or_path = "weight/last-step-checkpointing.ckpt" |
|
|
|
image_path='images/00000.jpg' |
|
prior_latlon=(37.75704325989902,-122.435941445631) |
|
tile_size_meters=128 |
|
demo = Demo(experiment_or_path=experiment_or_path, num_rotations=128, device='cpu') |
|
image, camera, gravity, proj, bbox, true_prior_latlon = read_input_image_test( |
|
image_path, |
|
prior_latlon=prior_latlon, |
|
tile_size_meters=tile_size_meters, |
|
) |
|
tiler = TileManager.from_bbox(projection=proj, bbox=bbox + 10,ppm=1, tile_size=tile_size_meters) |
|
|
|
canvas = tiler.query(bbox) |
|
uv, yaw, prob, neural_map, image_rectified, data_, pred = demo.localize( |
|
image, camera, canvas) |
|
prior_latlon_pred = proj.unproject(canvas.to_xy(uv)) |
|
pass |