MapItAnywhereModel / mapper /utils /generate_raycast_masks.py
hocherie
add files
4187c6f
raw
history blame
9.52 kB
from multiprocessing import Pool
from tqdm import tqdm
from pathlib import Path
import numpy as np
from collections import deque
import argparse
import cv2
def get_raycast_building_mask(building_grid):
laser_range = 200
num_laser = 100
robot_pos = (building_grid.shape[0] // 2-1, building_grid.shape[1] // 2 - 1)
unoccupied_pos = np.stack(np.where(building_grid != 1), axis=1)
if len(unoccupied_pos) == 0:
return None
l2_dist = unoccupied_pos - [robot_pos[0], robot_pos[1]]
closest = ((l2_dist ** 2).sum(1)**0.5).argmin()
robot_pos = (unoccupied_pos[closest][0], unoccupied_pos[closest][1])
free_points, hit_points, actual_hit_points = get_free_points_in_front(building_grid, robot_pos, laser_range=laser_range, num_laser=num_laser)
free_points[:, 0][free_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
free_points[:, 1][free_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
free_points[:, 0][free_points[:, 0] < 0] = 0
free_points[:, 1][free_points[:, 1] < 0] = 0
hit_points[:, 0][hit_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
hit_points[:, 1][hit_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
hit_points[:, 0][hit_points[:, 0] < 0] = 0
hit_points[:, 1][hit_points[:, 1] < 0] = 0
if len(free_points) > 0:
# Get vis mask by flood filling free space boundary
inited_flood_grid = init_flood_fill(robot_pos, hit_points, building_grid.shape)
inited_flood_grid = (inited_flood_grid * 255).astype(np.uint8).copy()
# pick a seed point from free points, that is not 0 in inited_flood_grid. We want it to be unknown
np.random.shuffle(free_points)
for i in range(len(free_points)):
seed_point = free_points[i]
if inited_flood_grid[seed_point[0], seed_point[1]] != 0:
break # Found a valid seed point, exit the loop
else:
print('Unable to find a valid seed point')
return None
num_filled, flooded_image, mask, bounding_box = cv2.floodFill(inited_flood_grid.copy(), None, seedPoint=(seed_point[1], seed_point[0]), newVal=0)
# name = names[batch_ind][-1]
return flooded_image
else:
print("No free points")
return None
def flood_fill_simple(center_point, occupancy_map):
"""
center_point: starting point (x,y) of fill
occupancy_map: occupancy map generated from Bresenham ray-tracing
"""
# Fill empty areas with queue method
occupancy_map = np.copy(occupancy_map)
sx, sy = occupancy_map.shape
fringe = deque()
fringe.appendleft(center_point)
while fringe:
n = fringe.pop()
nx, ny = n
unknown_val = 0.5
# West
if nx > 0:
if occupancy_map[nx - 1, ny] == unknown_val:
occupancy_map[nx - 1, ny] = 0
fringe.appendleft((nx - 1, ny))
# East
if nx < sx - 1:
if occupancy_map[nx + 1, ny] == unknown_val:
occupancy_map[nx + 1, ny] = 0
fringe.appendleft((nx + 1, ny))
# North
if ny > 0:
if occupancy_map[nx, ny - 1] == unknown_val:
occupancy_map[nx, ny - 1] = 0
fringe.appendleft((nx, ny - 1))
# South
if ny < sy - 1:
if occupancy_map[nx, ny + 1] == unknown_val:
occupancy_map[nx, ny + 1] = 0
fringe.appendleft((nx, ny + 1))
return occupancy_map
def init_flood_fill(robot_pos, obstacle_points, occ_grid_shape):
"""
center_point: center point
obstacle_points: detected obstacles points (x,y)
xy_points: (x,y) point pairs
"""
center_x, center_y = robot_pos
prev_ix, prev_iy = center_x, center_y
occupancy_map = (np.ones(occ_grid_shape)) * 0.5
# append first obstacle point to last
obstacle_points = np.vstack((obstacle_points, obstacle_points[0]))
for (x, y) in zip(obstacle_points[:,0], obstacle_points[:,1]):
# x coordinate of the the occupied area
ix = int(x)
# y coordinate of the the occupied area
iy = int(y)
free_area = bresenham((prev_ix, prev_iy), (ix, iy))
for fa in free_area:
occupancy_map[fa[0]][fa[1]] = 0 # free area 0.0
prev_ix = ix
prev_iy = iy
return occupancy_map
show_animation = False
def bresenham(start, end):
"""
Implementation of Bresenham's line drawing algorithm
See en.wikipedia.org/wiki/Bresenham's_line_algorithm
Bresenham's Line Algorithm
Produces a np.array from start and end (original from roguebasin.com)
>>> points1 = bresenham((4, 4), (6, 10))
>>> print(points1)
np.array([[4,4], [4,5], [5,6], [5,7], [5,8], [6,9], [6,10]])
"""
# setup initial conditions
x1, y1 = start
x2, y2 = end
dx = x2 - x1
dy = y2 - y1
is_steep = abs(dy) > abs(dx) # determine how steep the line is
if is_steep: # rotate line
x1, y1 = y1, x1
x2, y2 = y2, x2
# swap start and end points if necessary and store swap state
swapped = False
if x1 > x2:
x1, x2 = x2, x1
y1, y2 = y2, y1
swapped = True
dx = x2 - x1 # recalculate differentials
dy = y2 - y1 # recalculate differentials
error = int(dx / 2.0) # calculate error
y_step = 1 if y1 < y2 else -1
# iterate over bounding box generating points between start and end
y = y1
points = []
for x in range(x1, x2 + 1):
coord = [y, x] if is_steep else (x, y)
points.append(coord)
error -= abs(dy)
if error < 0:
y += y_step
error += dx
if swapped: # reverse the list if the coordinates were swapped
points.reverse()
points = np.array(points)
return points
def get_free_points_in_front(occupancy_grid, robot_pos, laser_range=10, num_laser=100):
"""
Assumes circular lidar
occupancy_grid: np.array (h x w)
robot_pos: (x, y)
Outputs:
free_points: np.array of hit points (x, y)
"""
free_points = []
hit_points = [] # actual hit points + last bresenham point (for some reason need this for flodding)
actual_hit_points = [] #
for orientation in np.linspace(np.pi/2, 3*np.pi/2, num_laser):
end_point = (round(robot_pos[0] + laser_range * np.cos(orientation)), round(robot_pos[1] + laser_range * np.sin(orientation)))
# Get index along ray to check
bresenham_points = (bresenham(robot_pos, end_point))
# Go through the points and see the first hit
# TODO: do a check if any first?
for i in range(len(bresenham_points)):
# if bresenham point is in the map
if bresenham_points[i,0] < 0 or bresenham_points[i,0] >= occupancy_grid.shape[0] or bresenham_points[i,1] < 0 or bresenham_points[i,1] >= occupancy_grid.shape[1]:
if i != 0:
hit_points.append(bresenham_points[i-1])
break # don't use this bresenham point
if occupancy_grid[bresenham_points[i,0], bresenham_points[i,1]] == 1: # hit if it is void or occupied #! THINK IF THIS IS A GOOD ASSUMPTION
for j in range(min(4, len(bresenham_points) - i - 1)): # add 4 points in front of hit
free_points.append(bresenham_points[i+j])
actual_hit_points.append(bresenham_points[i + j + 1])
hit_points.append(bresenham_points[i + j + 1])
break
else: # no hits
free_point = bresenham_points[i]
free_points.append(free_point)
if i == len(bresenham_points) - 1:
hit_points.append(end_point) # need to add this for proper flooding for vis mask
break
# Convert to np.array
free_points = np.array(free_points)
hit_points = np.array(hit_points)
actual_hit_points = np.array(actual_hit_points)
return free_points, hit_points, actual_hit_points
if __name__ == "__main__":
# Argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_folder", type=str, default="/path/to/raycast")
parser.add_argument("--class_idx_building", type=int, default=4)
parser.add_argument("--num_workers", type=int, default=60)
parser.add_argument("--location", type=str, default="los_angeles")
args = parser.parse_args()
dataset_folder = Path(args.dataset_folder)
bev_folder = dataset_folder / args.location / "semantic_masks"
output_folder = dataset_folder / args.location / "flood_fill"
output_folder.mkdir(exist_ok=True, parents=True)
def generate_mask(filepath):
mask = np.load(filepath)
building_grid = mask[..., args.class_idx_building]
try:
flooded_image = get_raycast_building_mask(building_grid)
except:
raise Exception(f"Error in {filepath}")
if flooded_image is not None:
output_file = output_folder / filepath.name
np.save(output_file, flooded_image)
else:
print("No flood fill generated")
bev_files = list(bev_folder.iterdir())
with Pool(args.num_workers) as p:
for _ in tqdm(p.imap_unordered(generate_mask, bev_files), total=len(bev_files)):
pass