Spaces:
Building
Building
File size: 9,524 Bytes
4187c6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
from multiprocessing import Pool
from tqdm import tqdm
from pathlib import Path
import numpy as np
from collections import deque
import argparse
import cv2
def get_raycast_building_mask(building_grid):
laser_range = 200
num_laser = 100
robot_pos = (building_grid.shape[0] // 2-1, building_grid.shape[1] // 2 - 1)
unoccupied_pos = np.stack(np.where(building_grid != 1), axis=1)
if len(unoccupied_pos) == 0:
return None
l2_dist = unoccupied_pos - [robot_pos[0], robot_pos[1]]
closest = ((l2_dist ** 2).sum(1)**0.5).argmin()
robot_pos = (unoccupied_pos[closest][0], unoccupied_pos[closest][1])
free_points, hit_points, actual_hit_points = get_free_points_in_front(building_grid, robot_pos, laser_range=laser_range, num_laser=num_laser)
free_points[:, 0][free_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
free_points[:, 1][free_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
free_points[:, 0][free_points[:, 0] < 0] = 0
free_points[:, 1][free_points[:, 1] < 0] = 0
hit_points[:, 0][hit_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1
hit_points[:, 1][hit_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1
hit_points[:, 0][hit_points[:, 0] < 0] = 0
hit_points[:, 1][hit_points[:, 1] < 0] = 0
if len(free_points) > 0:
# Get vis mask by flood filling free space boundary
inited_flood_grid = init_flood_fill(robot_pos, hit_points, building_grid.shape)
inited_flood_grid = (inited_flood_grid * 255).astype(np.uint8).copy()
# pick a seed point from free points, that is not 0 in inited_flood_grid. We want it to be unknown
np.random.shuffle(free_points)
for i in range(len(free_points)):
seed_point = free_points[i]
if inited_flood_grid[seed_point[0], seed_point[1]] != 0:
break # Found a valid seed point, exit the loop
else:
print('Unable to find a valid seed point')
return None
num_filled, flooded_image, mask, bounding_box = cv2.floodFill(inited_flood_grid.copy(), None, seedPoint=(seed_point[1], seed_point[0]), newVal=0)
# name = names[batch_ind][-1]
return flooded_image
else:
print("No free points")
return None
def flood_fill_simple(center_point, occupancy_map):
"""
center_point: starting point (x,y) of fill
occupancy_map: occupancy map generated from Bresenham ray-tracing
"""
# Fill empty areas with queue method
occupancy_map = np.copy(occupancy_map)
sx, sy = occupancy_map.shape
fringe = deque()
fringe.appendleft(center_point)
while fringe:
n = fringe.pop()
nx, ny = n
unknown_val = 0.5
# West
if nx > 0:
if occupancy_map[nx - 1, ny] == unknown_val:
occupancy_map[nx - 1, ny] = 0
fringe.appendleft((nx - 1, ny))
# East
if nx < sx - 1:
if occupancy_map[nx + 1, ny] == unknown_val:
occupancy_map[nx + 1, ny] = 0
fringe.appendleft((nx + 1, ny))
# North
if ny > 0:
if occupancy_map[nx, ny - 1] == unknown_val:
occupancy_map[nx, ny - 1] = 0
fringe.appendleft((nx, ny - 1))
# South
if ny < sy - 1:
if occupancy_map[nx, ny + 1] == unknown_val:
occupancy_map[nx, ny + 1] = 0
fringe.appendleft((nx, ny + 1))
return occupancy_map
def init_flood_fill(robot_pos, obstacle_points, occ_grid_shape):
"""
center_point: center point
obstacle_points: detected obstacles points (x,y)
xy_points: (x,y) point pairs
"""
center_x, center_y = robot_pos
prev_ix, prev_iy = center_x, center_y
occupancy_map = (np.ones(occ_grid_shape)) * 0.5
# append first obstacle point to last
obstacle_points = np.vstack((obstacle_points, obstacle_points[0]))
for (x, y) in zip(obstacle_points[:,0], obstacle_points[:,1]):
# x coordinate of the the occupied area
ix = int(x)
# y coordinate of the the occupied area
iy = int(y)
free_area = bresenham((prev_ix, prev_iy), (ix, iy))
for fa in free_area:
occupancy_map[fa[0]][fa[1]] = 0 # free area 0.0
prev_ix = ix
prev_iy = iy
return occupancy_map
show_animation = False
def bresenham(start, end):
"""
Implementation of Bresenham's line drawing algorithm
See en.wikipedia.org/wiki/Bresenham's_line_algorithm
Bresenham's Line Algorithm
Produces a np.array from start and end (original from roguebasin.com)
>>> points1 = bresenham((4, 4), (6, 10))
>>> print(points1)
np.array([[4,4], [4,5], [5,6], [5,7], [5,8], [6,9], [6,10]])
"""
# setup initial conditions
x1, y1 = start
x2, y2 = end
dx = x2 - x1
dy = y2 - y1
is_steep = abs(dy) > abs(dx) # determine how steep the line is
if is_steep: # rotate line
x1, y1 = y1, x1
x2, y2 = y2, x2
# swap start and end points if necessary and store swap state
swapped = False
if x1 > x2:
x1, x2 = x2, x1
y1, y2 = y2, y1
swapped = True
dx = x2 - x1 # recalculate differentials
dy = y2 - y1 # recalculate differentials
error = int(dx / 2.0) # calculate error
y_step = 1 if y1 < y2 else -1
# iterate over bounding box generating points between start and end
y = y1
points = []
for x in range(x1, x2 + 1):
coord = [y, x] if is_steep else (x, y)
points.append(coord)
error -= abs(dy)
if error < 0:
y += y_step
error += dx
if swapped: # reverse the list if the coordinates were swapped
points.reverse()
points = np.array(points)
return points
def get_free_points_in_front(occupancy_grid, robot_pos, laser_range=10, num_laser=100):
"""
Assumes circular lidar
occupancy_grid: np.array (h x w)
robot_pos: (x, y)
Outputs:
free_points: np.array of hit points (x, y)
"""
free_points = []
hit_points = [] # actual hit points + last bresenham point (for some reason need this for flodding)
actual_hit_points = [] #
for orientation in np.linspace(np.pi/2, 3*np.pi/2, num_laser):
end_point = (round(robot_pos[0] + laser_range * np.cos(orientation)), round(robot_pos[1] + laser_range * np.sin(orientation)))
# Get index along ray to check
bresenham_points = (bresenham(robot_pos, end_point))
# Go through the points and see the first hit
# TODO: do a check if any first?
for i in range(len(bresenham_points)):
# if bresenham point is in the map
if bresenham_points[i,0] < 0 or bresenham_points[i,0] >= occupancy_grid.shape[0] or bresenham_points[i,1] < 0 or bresenham_points[i,1] >= occupancy_grid.shape[1]:
if i != 0:
hit_points.append(bresenham_points[i-1])
break # don't use this bresenham point
if occupancy_grid[bresenham_points[i,0], bresenham_points[i,1]] == 1: # hit if it is void or occupied #! THINK IF THIS IS A GOOD ASSUMPTION
for j in range(min(4, len(bresenham_points) - i - 1)): # add 4 points in front of hit
free_points.append(bresenham_points[i+j])
actual_hit_points.append(bresenham_points[i + j + 1])
hit_points.append(bresenham_points[i + j + 1])
break
else: # no hits
free_point = bresenham_points[i]
free_points.append(free_point)
if i == len(bresenham_points) - 1:
hit_points.append(end_point) # need to add this for proper flooding for vis mask
break
# Convert to np.array
free_points = np.array(free_points)
hit_points = np.array(hit_points)
actual_hit_points = np.array(actual_hit_points)
return free_points, hit_points, actual_hit_points
if __name__ == "__main__":
# Argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_folder", type=str, default="/path/to/raycast")
parser.add_argument("--class_idx_building", type=int, default=4)
parser.add_argument("--num_workers", type=int, default=60)
parser.add_argument("--location", type=str, default="los_angeles")
args = parser.parse_args()
dataset_folder = Path(args.dataset_folder)
bev_folder = dataset_folder / args.location / "semantic_masks"
output_folder = dataset_folder / args.location / "flood_fill"
output_folder.mkdir(exist_ok=True, parents=True)
def generate_mask(filepath):
mask = np.load(filepath)
building_grid = mask[..., args.class_idx_building]
try:
flooded_image = get_raycast_building_mask(building_grid)
except:
raise Exception(f"Error in {filepath}")
if flooded_image is not None:
output_file = output_folder / filepath.name
np.save(output_file, flooded_image)
else:
print("No flood fill generated")
bev_files = list(bev_folder.iterdir())
with Pool(args.num_workers) as p:
for _ in tqdm(p.imap_unordered(generate_mask, bev_files), total=len(bev_files)):
pass
|