Spaces:
Running
on
Zero
Running
on
Zero
| ####################################################################### | |
| # Name: env.py | |
| # | |
| # - Reads and processes training and test maps (E.g. DungeonMaps) | |
| # - Processes rewards, new frontiers given action | |
| # - Updates a graph representation of environment for input into network | |
| ####################################################################### | |
| import sys | |
| import cv2 | |
| from matplotlib.colors import LogNorm, PowerNorm | |
| if sys.modules['TRAINING']: | |
| from parameter import * | |
| else: | |
| from test_parameter import * | |
| import copy | |
| import pandas as pd | |
| import rasterio | |
| from skimage import io | |
| import matplotlib.pyplot as plt | |
| import os | |
| from skimage.measure import block_reduce | |
| from sensor import * | |
| from graph_generator import * | |
| from node import * | |
| from scipy.ndimage import label, find_objects | |
| import matplotlib.image as mpimg | |
| from matplotlib.colors import Normalize | |
| # import matplotlib | |
| # matplotlib.use("Agg") # <-- key line to avoid tkinter dependency | |
| class Env(): | |
| def __init__(self, map_index, n_agent, k_size=20, plot=False, test=False, mask_index=None): | |
| self.n_agent = n_agent | |
| self.test = test | |
| self.map_dir = GRIDMAP_SET_DIR | |
| # Import environment gridmap | |
| self.map_list = os.listdir(self.map_dir) | |
| self.map_list.sort(reverse=True) | |
| # NEW: Import segmentation utility map | |
| self.seg_dir = MASK_SET_DIR | |
| self.segmentation_mask, self.target_positions, self.target_found_idxs = None, [], [] | |
| self.segmentation_mask_list = os.listdir(self.seg_dir) | |
| self.segmentation_mask_list.sort(reverse=True) | |
| # Import target maps (if relevant) | |
| if TARGETS_SET_DIR != "": | |
| self.targets_map_list = os.listdir(TARGETS_SET_DIR) | |
| self.targets_map_list.sort(reverse=True) | |
| # # NEW: Find common files in both directories | |
| # if TARGETS_SET_DIR != "": | |
| # common_files = [file for file in self.map_list if file in self.segmentation_mask_list and file in self.targets_map_list] | |
| # else: | |
| # common_files = [file for file in self.map_list if file in self.segmentation_mask_list] | |
| self.map_index = map_index % len(self.map_list) | |
| if mask_index is not None: | |
| self.mask_index = mask_index % len(self.segmentation_mask_list) | |
| else: | |
| self.mask_index = map_index % len(self.segmentation_mask_list) | |
| # self.common_map_file = common_files[self.map_index] | |
| # print("self.common_map_file: ", self.common_map_file) | |
| # Import ground truth and segmentation mask | |
| self.ground_truth, self.map_start_position = self.import_ground_truth( | |
| os.path.join(self.map_dir, self.map_list[self.map_index]))# self.common_map_file)) | |
| self.ground_truth_size = np.shape(self.ground_truth) # (480, 640) | |
| self.robot_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 | |
| self.downsampled_belief = None | |
| self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
| self.coverage_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 | |
| # Import segmentation mask | |
| mask_filename = self.segmentation_mask_list[self.mask_index] | |
| self.segmentation_mask = self.import_segmentation_mask( | |
| os.path.join(self.seg_dir, mask_filename))# self.common_map_file)) | |
| # print("mask_filename: ", mask_filename) | |
| # Overwrite target positions if directory specified | |
| self.gt_segmentation_mask = None | |
| if self.test and TARGETS_SET_DIR != "": | |
| self.gt_segmentation_mask = self.import_segmentation_mask( | |
| os.path.join(TARGETS_SET_DIR, self.map_list[self.map_index])) # UNUSED - self.common_map_file)) | |
| # print("target_positions: ", self.target_positions) | |
| # print("np.unique(self.segmentation_mask): ", np.unique(self.segmentation_mask)) | |
| self.segmentation_info_mask = None | |
| self.gt_segmentation_info_mask = None | |
| self.segmentation_info_mask_unnormalized = None | |
| self.filtered_seg_info_mask = None | |
| self.num_targets_found = 0 | |
| self.num_new_targets_found = 0 | |
| # # Link score masks to raw image files | |
| # csv_file = pd.read_csv(RAW_IMG_PATH_DICT, header=None) | |
| # img_score_paths = csv_file.iloc[:,2].tolist() | |
| # raw_img_paths = csv_file.iloc[:,0].tolist() | |
| # self.score_to_img_dict = {os.path.basename(img_score_path): raw_img_path for img_score_path, raw_img_path in zip(img_score_paths, raw_img_paths)} | |
| self.resolution = 4 | |
| self.sensor_range = SENSOR_RANGE | |
| self.explored_rate = 0 | |
| self.targets_found_rate = 0 | |
| self.info_gain = 0 | |
| self.total_info = 0 | |
| self.graph_generator = Graph_generator(map_size=self.ground_truth_size, sensor_range=self.sensor_range, k_size=k_size, plot=plot) | |
| self.node_coords, self.graph, self.node_utility, self.guidepost = None, None, None, None | |
| self.frontiers = None | |
| self.start_positions = [] | |
| self.begin(self.map_start_position) | |
| self.plot = plot | |
| self.frame_files = [] | |
| def find_index_from_coords(self, position): | |
| index = np.argmin(np.linalg.norm(self.node_coords - position, axis=1)) | |
| return index | |
| def begin(self, start_position): | |
| # self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, | |
| # self.ground_truth) | |
| self.robot_belief = self.ground_truth | |
| self.downsampled_belief = block_reduce(self.robot_belief.copy(), block_size=(self.resolution, self.resolution), | |
| func=np.min) | |
| self.frontiers = self.find_frontier() | |
| self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
| self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.generate_graph( | |
| self.robot_belief, self.frontiers) | |
| # Find non-conflicting start positions | |
| if FIX_START_POSITION: | |
| coords_res_row = int(self.robot_belief.shape[0]/NUM_COORDS_HEIGHT) | |
| coords_res_col = int(self.robot_belief.shape[1]/NUM_COORDS_WIDTH) | |
| self.start_positions = [(int(self.robot_belief.shape[1]/2)-coords_res_col/2,int(self.robot_belief.shape[0]/2)-coords_res_row/2) for _ in range(self.n_agent)] # bottom-left corner | |
| else: | |
| nearby_coords = self.graph_generator.get_neighbors_grid_coords(start_position) | |
| itr = 0 | |
| for i in range(self.n_agent): | |
| if i == 0 or len(nearby_coords) == 0: | |
| self.start_positions.append(start_position) | |
| else: | |
| idx = min(itr, len(nearby_coords)-1) | |
| self.start_positions.append(nearby_coords[idx]) | |
| itr += 1 | |
| for i in range(len(self.start_positions)): | |
| self.start_positions[i] = self.node_coords[self.find_index_from_coords(self.start_positions[i])] | |
| self.coverage_belief = self.update_robot_belief(self.start_positions[i], self.sensor_range, self.coverage_belief, | |
| self.ground_truth) | |
| for start_position in self.start_positions: | |
| self.graph_generator.route_node.append(start_position) | |
| # Info map from ground truth | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| self.segmentation_info_mask = np.zeros((len(self.node_coords), 1)) | |
| self.gt_segmentation_info_mask = np.zeros((len(self.node_coords), 1)) | |
| for i, node_coord in enumerate(self.node_coords): | |
| max_x = min(node_coord[0] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(node_coord[0] - int(math.ceil(rng_x)), 0) | |
| max_y = min(node_coord[1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(node_coord[1] - int(math.ceil(rng_y)), 0) | |
| # if np.any(self.segmentation_mask[min_y:max_y, min_x:max_x] == 255): | |
| # self.segmentation_info_mask[i] = 1.0 | |
| # else: | |
| # self.segmentation_info_mask[i] = 0.0 | |
| # self.segmentation_info_mask[i] = np.mean(self.segmentation_mask[min_y:max_y, min_x:max_x]) | |
| # self.segmentation_info_mask[i] = np.max(self.segmentation_mask[min_y:max_y, min_x:max_x]) | |
| if TARGETS_SET_DIR == "": # If targets combined with segmentation mask | |
| exclude = {208} # Exclude target positions | |
| else: | |
| exclude = {} | |
| self.segmentation_info_mask[i] = max(x for x in self.segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 | |
| if self.gt_segmentation_mask is not None: | |
| self.gt_segmentation_info_mask[i] = max(x for x in self.gt_segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 | |
| # print("np.unique(self.segmentation_info_mask): ", np.unique(self.segmentation_info_mask)) | |
| self.filtered_seg_info_mask = copy.deepcopy(self.segmentation_info_mask) | |
| # In case targets found at beginning... | |
| done, num_targets_found = self.check_done() | |
| self.num_targets_found = num_targets_found | |
| def multi_robot_step(self, next_position_list, dist_list, travel_dist_list): | |
| temp_frontiers = copy.deepcopy(self.frontiers) | |
| reward_list = [] | |
| for dist, robot_position in zip(dist_list, next_position_list): | |
| self.graph_generator.route_node.append(robot_position) | |
| next_node_index = self.find_index_from_coords(robot_position) | |
| self.graph_generator.nodes_list[next_node_index].set_visited() | |
| # self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, | |
| # self.ground_truth) | |
| self.coverage_belief = self.update_robot_belief(robot_position, self.sensor_range, self.coverage_belief, | |
| self.ground_truth) | |
| self.robot_belief = self.ground_truth | |
| self.downsampled_belief = block_reduce(self.robot_belief.copy(), | |
| block_size=(self.resolution, self.resolution), | |
| func=np.min) | |
| frontiers = self.find_frontier() | |
| #num_observed_frontiers = self.calculate_num_observed_frontiers(temp_frontiers, frontiers) | |
| #temp_frontiers = frontiers | |
| num_observed_frontiers = self.node_utility[next_node_index] | |
| # individual_reward = num_observed_frontiers / 50 - dist / 64 | |
| individual_reward = -dist / 32 # 64 | |
| info_gain_reward = 0 | |
| robot_position_idx = self.find_index_from_coords(robot_position) | |
| # if self.segmentation_info_mask[robot_position_idx] == 1.0 and self.guidepost[robot_position_idx] == 0.0: | |
| # # print("High Info (Unvisited)") | |
| # info_gain_reward = (HIGH_INFO_REWARD_RATIO * 1.5) | |
| # elif self.segmentation_info_mask[robot_position_idx] == 0.0 and self.guidepost[robot_position_idx] == 0.0: | |
| # # print("Low Info (Unvisited)") | |
| # info_gain_reward = ((1-HIGH_INFO_REWARD_RATIO) * 1.5) | |
| info_gain_reward = self.filtered_seg_info_mask[robot_position_idx][0] * 1.5 | |
| if self.guidepost[robot_position_idx] == 0.0: | |
| info_gain_reward += 0.2 | |
| # print("info_gain_reward: ", info_gain_reward) | |
| individual_reward += info_gain_reward | |
| # print("dist / 64: ", dist / 64) | |
| # print("info gain reward: ", info_gain_reward) | |
| reward_list.append(individual_reward) | |
| self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.update_graph(self.robot_belief, self.old_robot_belief, frontiers, self.frontiers) | |
| self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
| self.filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] | |
| self.filtered_seg_info_mask = np.expand_dims(np.array(self.filtered_seg_info_mask), axis=1) | |
| num_observed_frontiers = self.calculate_num_observed_frontiers(self.frontiers, frontiers) | |
| self.frontiers = frontiers | |
| self.explored_rate = self.evaluate_exploration_rate() | |
| done, num_targets_found = self.check_done() | |
| self.num_new_targets_found = num_targets_found - self.num_targets_found | |
| # #team_reward = sum(reward_list) / len(reward_list) | |
| # # team_reward = num_observed_frontiers / 50 | |
| # team_reward = self.num_new_targets_found * 5.0 | |
| team_reward = 0.0 | |
| # # print("target found reward: ", self.num_new_targets_found * 5.0) | |
| self.num_targets_found = num_targets_found | |
| self.targets_found_rate = self.evaluate_targets_found_rate() | |
| self.info_gain, self.total_info = self.evaluate_info_gain() | |
| if done: | |
| # team_reward += np.sum(self.robot_belief == 255) / sum(travel_dist_list) | |
| team_reward += 40 # 20 | |
| for i in range(len(reward_list)): | |
| reward_list[i] += team_reward | |
| return reward_list, done | |
| def import_ground_truth(self, map_index): | |
| # occupied 1, free 255, unexplored 127 | |
| try: | |
| # ground_truth = (io.imread(map_index, 1) * 255).astype(int) | |
| ground_truth = (io.imread(map_index, 1)).astype(int) | |
| if np.all(ground_truth == 0): | |
| ground_truth = (io.imread(map_index, 1) * 255).astype(int) | |
| except: | |
| new_map_index = self.map_dir + '/' + self.map_list[0] | |
| ground_truth = (io.imread(new_map_index, 1)).astype(int) | |
| print('could not read the map_path ({}), hence skipping it and using ({}).'.format(map_index, new_map_index)) | |
| robot_location = np.nonzero(ground_truth == 208) | |
| # print("robot_location: ", robot_location) | |
| # print("np.array(robot_location)[1, 127]: ", np.array(robot_location)[1, 127]) | |
| robot_location = np.array([np.array(robot_location)[1, 127], np.array(robot_location)[0, 127]]) | |
| ground_truth = (ground_truth > 150) | |
| ground_truth = ground_truth * 254 + 1 | |
| return ground_truth, robot_location | |
| def import_segmentation_mask(self, map_index): | |
| # occupied 1, free 255, unexplored 127 | |
| # mask = (io.imread(map_index, 1) * 255).astype(int) # NOTE: Cannot work well with seg mask self-generated | |
| mask = cv2.imread(map_index).astype(int) | |
| # print("np.unique(segmentation_mask): ", np.unique(mask)) | |
| # NOTE: Could contain mutiple start positions | |
| # target_position = np.nonzero(mask == 208) | |
| # target_positions = self.find_target_locations(mask) | |
| # target_position = np.array([np.array(target_position)[1, 127], np.array(target_position)[0, 127]]) | |
| return mask #, target_positions | |
| def find_target_locations(self, image_array, grey_value=208): | |
| # Load the image | |
| # image = Image.open(image_path) | |
| # image_array = np.array(image) | |
| # Identify pixels equal to the grey value | |
| grey_pixels = np.where(image_array == grey_value) | |
| # Create a binary array where grey pixels are marked as True | |
| binary_array = np.zeros_like(image_array, dtype=bool) | |
| binary_array[grey_pixels] = True | |
| # Label connected components | |
| labeled_array, num_features = label(binary_array) | |
| # Find objects returns slices for each connected component | |
| slices = find_objects(labeled_array) | |
| # Calculate the center of each box | |
| centers = [] | |
| for slice in slices: | |
| row_center = (slice[0].start + slice[0].stop - 1) // 2 | |
| col_center = (slice[1].start + slice[1].stop - 1) // 2 | |
| centers.append((col_center, row_center)) # (y,x) | |
| return centers | |
| def free_cells(self): | |
| index = np.where(self.ground_truth == 255) | |
| free = np.asarray([index[1], index[0]]).T | |
| return free | |
| def update_robot_belief(self, robot_position, sensor_range, robot_belief, ground_truth): | |
| robot_belief = sensor_work(robot_position, sensor_range, robot_belief, ground_truth) | |
| return robot_belief | |
| def check_done(self, robot_id=0): | |
| """ All agnets to have explored most of the env map """ | |
| done = False | |
| # for idx in range(self.n_agent): | |
| # if np.sum(self.ground_truth == 255) - np.sum(self.all_robot_belief[idx][idx] == 255) > 40: | |
| # done = False | |
| # NEW: ADDITIONAL VLM SEARCH CRITERIA | |
| num_targets_found = 0 | |
| self.target_found_idxs = [] | |
| for i, target in enumerate(self.target_positions): | |
| if self.coverage_belief[target[1], target[0]] == 255: # 255: | |
| num_targets_found += 1 | |
| self.target_found_idxs.append(i) | |
| # free_cells_mask = self.all_robot_belief[robot_id][robot_id] == 255 | |
| # filtered_segmentation_mask = np.where(free_cells_mask, self.segmentation_mask, 0) | |
| # targets = self.find_target_locations(filtered_segmentation_mask) | |
| # print("num_targets_found: ", num_targets_found) | |
| if TERMINATE_ON_TGTS_FOUND and num_targets_found >= len(self.target_positions): | |
| done = True | |
| if not TERMINATE_ON_TGTS_FOUND and np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) >= 0.99: | |
| done = True | |
| return done, num_targets_found | |
| def calculate_num_observed_frontiers(self, old_frontiers, frontiers): | |
| frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j | |
| pre_frontiers_to_check = old_frontiers[:, 0] + old_frontiers[:, 1] * 1j | |
| frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] | |
| pre_frontiers_num = pre_frontiers_to_check.shape[0] | |
| delta_num = pre_frontiers_num - frontiers_num | |
| return delta_num | |
| def calculate_reward(self, dist, frontiers): | |
| reward = 0 | |
| reward -= dist / 64 | |
| frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j | |
| pre_frontiers_to_check = self.frontiers[:, 0] + self.frontiers[:, 1] * 1j | |
| frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] | |
| pre_frontiers_num = pre_frontiers_to_check.shape[0] | |
| delta_num = pre_frontiers_num - frontiers_num | |
| reward += delta_num / 50 | |
| return reward | |
| def evaluate_exploration_rate(self): | |
| # rate = np.sum(self.robot_belief == 255) / np.sum(self.ground_truth == 255) | |
| rate = np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) | |
| return rate | |
| def evaluate_targets_found_rate(self): | |
| if len(self.target_positions) == 0: | |
| return 0 | |
| else: | |
| rate = self.num_targets_found / len(self.target_positions) | |
| return rate | |
| def evaluate_info_gain(self): | |
| # print("self.segmentation_mask.shape: ", self.segmentation_mask.shape) | |
| # coverage_belief = (self.coverage_belief == 255) | |
| # print("coverage_belief.shape: ", coverage_belief.shape) | |
| # print("np.unique(coverage_belief): ", np.unique(coverage_belief)) | |
| # print("np.count_nonzero(coverage_belief): ", np.count_nonzero(coverage_belief)) | |
| # print("np.count_zero(coverage_belief): ", coverage_belief.size - np.count_nonzero(coverage_belief)) | |
| # print("self.segmentation_mask[self.coverage_belief == 255].shape: ", self.segmentation_mask[self.coverage_belief == 255].shape) | |
| if self.test and TARGETS_SET_DIR != "": | |
| info_gained = np.sum(self.gt_segmentation_mask[self.coverage_belief == 255]) / 100.0 | |
| total_info = np.sum(self.gt_segmentation_mask) / 100.0 | |
| else: | |
| info_gained = np.sum(self.segmentation_mask[self.coverage_belief == 255]) / 100.0 | |
| total_info = np.sum(self.segmentation_mask) / 100.0 | |
| return info_gained, total_info | |
| def calculate_new_free_area(self): | |
| old_free_area = self.old_robot_belief == 255 | |
| current_free_area = self.robot_belief == 255 | |
| new_free_area = (current_free_area.astype(np.int) - old_free_area.astype(np.int)) * 255 | |
| return new_free_area, np.sum(old_free_area) | |
| def calculate_dist_path(self, path): | |
| dist = 0 | |
| start = path[0] | |
| end = path[-1] | |
| for index in path: | |
| if index == end: | |
| break | |
| dist += np.linalg.norm(self.node_coords[start] - self.node_coords[index]) | |
| start = index | |
| return dist | |
| def find_frontier(self): | |
| y_len = self.downsampled_belief.shape[0] | |
| x_len = self.downsampled_belief.shape[1] | |
| mapping = self.downsampled_belief.copy() | |
| belief = self.downsampled_belief.copy() | |
| # 0-1 unknown area map | |
| mapping = (mapping == 127) * 1 | |
| mapping = np.lib.pad(mapping, ((1, 1), (1, 1)), 'constant', constant_values=0) | |
| fro_map = mapping[2:][:, 1:x_len + 1] + mapping[:y_len][:, 1:x_len + 1] + mapping[1:y_len + 1][:, 2:] + \ | |
| mapping[1:y_len + 1][:, :x_len] + mapping[:y_len][:, 2:] + mapping[2:][:, :x_len] + mapping[2:][:, | |
| 2:] + \ | |
| mapping[:y_len][:, :x_len] | |
| ind_free = np.where(belief.ravel(order='F') == 255)[0] | |
| ind_fron_1 = np.where(1 < fro_map.ravel(order='F'))[0] | |
| ind_fron_2 = np.where(fro_map.ravel(order='F') < 8)[0] | |
| ind_fron = np.intersect1d(ind_fron_1, ind_fron_2) | |
| ind_to = np.intersect1d(ind_free, ind_fron) | |
| map_x = x_len | |
| map_y = y_len | |
| x = np.linspace(0, map_x - 1, map_x) | |
| y = np.linspace(0, map_y - 1, map_y) | |
| t1, t2 = np.meshgrid(x, y) | |
| points = np.vstack([t1.T.ravel(), t2.T.ravel()]).T | |
| f = points[ind_to] | |
| f = f.astype(int) | |
| f = f * self.resolution | |
| return f | |
| def plot_env(self, n, path, step, travel_dist, robots_route, img_path_override=None, sat_path_override=None, msk_name_override=None, sound_id_override=None, colormap_mid_val=None): | |
| # # TEMP | |
| # if TAXABIND_TTA: | |
| # # Save self.segmentation_info_mask as .npy file in gifs_path | |
| # side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
| # mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
| # np.save(os.path.join(path, f"seg_mask_step{step}.npy"), mask_viz) | |
| plt.switch_backend('agg') | |
| # plt.ion() | |
| plt.cla() | |
| color_list = ["r", "g", "c", "m", "y", "k"] | |
| if TARGETS_SET_DIR == "" and not TAXABIND_TTA: | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5)) | |
| else: | |
| fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(12, 8)) | |
| ### Fig1: Environment ### | |
| msk_name = "" | |
| if TAXABIND_TTA: | |
| image = mpimg.imread(sat_path_override) | |
| msk_name = msk_name_override | |
| # else: | |
| # plt.imshow(self.robot_belief, cmap='gray') | |
| # ax1.imshow(self.coverage_belief, cmap='gray') | |
| # image = mpimg.imread("Maps/real_maps/real/4259_masked_img_0.jpg") | |
| # msk_name = self.map_list[self.map_index] | |
| # raw_img_path = self.score_to_img_dict[msk_name] | |
| # if "flair" in raw_img_path: | |
| # with rasterio.open(raw_img_path) as src_img: | |
| # image = src_img.read([1,2,3]) | |
| # image = np.transpose(image, (1, 2, 0)) | |
| # else: | |
| # image = mpimg.imread(raw_img_path) | |
| ### Fig1: Environment ### | |
| ax = ax1 # if TAXABIND_TTA else ax1 | |
| ax.imshow(image) | |
| ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| ax.set_title("Image") | |
| # if VIZ_GRAPH_EDGES: | |
| # for i in range(len(self.graph_generator.x)): | |
| # ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) | |
| # # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) | |
| # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) | |
| # ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints = route[0] | |
| yPoints = route[1] | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| ### Fig2: Graph ### | |
| ax = ax4 if TAXABIND_TTA else ax1 | |
| # ax.imshow(image) | |
| ax.imshow(self.coverage_belief, cmap='gray') | |
| ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| ax.set_title("Information Graph") | |
| if VIZ_GRAPH_EDGES: | |
| for i in range(len(self.graph_generator.x)): | |
| ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) | |
| # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) | |
| # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) | |
| # filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] | |
| ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.filtered_seg_info_mask, zorder=5, s=8) | |
| # ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints = route[0] | |
| yPoints = route[1] | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| # Plot target positions | |
| for target in self.target_positions: | |
| if self.coverage_belief[target[1], target[0]] == 255: | |
| # ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) | |
| ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
| else: | |
| # ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) | |
| ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
| # ax.pause(0.1) | |
| ### Fig3: Segmentation Mask ### | |
| ax = ax5 if TAXABIND_TTA else ax2 | |
| if TAXABIND_TTA and USE_CLIP_PREDS: | |
| side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
| mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
| scale_y = math.ceil(self.ground_truth_size[1] / side_dim) | |
| scale_x = math.ceil(self.ground_truth_size[0] / side_dim) | |
| upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only | |
| upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] | |
| im = ax.imshow(upscaled_mask_viz, cmap="viridis") | |
| ax.axis("off") | |
| else: | |
| im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
| ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| ax.set_title(f"Predicted Mask (Normalized)") | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints = route[0] | |
| yPoints = route[1] | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| # Add a colorbar | |
| cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
| cbar.set_label("Normalized Probs") | |
| # ax.pause(0.1) | |
| ### Fig4: Segmentation Mask ### | |
| if TAXABIND_TTA and USE_CLIP_PREDS: | |
| ax = ax6 | |
| side_dim = int(np.sqrt(self.segmentation_info_mask_unnormalized.shape[0])) | |
| mask_viz = self.segmentation_info_mask_unnormalized.squeeze().reshape((side_dim, side_dim)).T | |
| scale_y = math.ceil(self.ground_truth_size[1] / side_dim) | |
| scale_x = math.ceil(self.ground_truth_size[0] / side_dim) | |
| upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only | |
| upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] | |
| max_val = 0.15 # TO CHANGE | |
| mid_val = colormap_mid_val if colormap_mid_val is not None else 0.05 | |
| # mid_val = np.max(self.segmentation_info_mask_unnormalized) | |
| norm = CustomNorm(vmin=0.0, vmax=max_val, mid=mid_val, lower_portion=0.8) | |
| im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) | |
| # norm = PowerNorm(gamma=0.25, vmin=0.01, vmax=0.2) | |
| # norm=LogNorm(vmin=0.01, vmax=0.2) | |
| im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) | |
| ax.axis("off") | |
| # else: | |
| # im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
| # ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| ax.set_title(f"Predicted Mask (Unnormalized)") | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints = route[0] | |
| yPoints = route[1] | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| # Add a colorbar | |
| cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
| if TAXABIND_TTA and USE_CLIP_PREDS: | |
| cbar.set_ticks([0.0, mid_val, max_val]) | |
| cbar.set_label("Probs (Scaled by expectation)") | |
| # Fog5: GT Mask | |
| if TARGETS_SET_DIR != "": | |
| ax = ax2 | |
| im = ax.imshow(self.gt_segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
| ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| ax.set_title(f"Ground Truth Mask") | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints = route[0] | |
| yPoints = route[1] | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| # Add a colorbar | |
| cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
| cbar.set_label("Normalized Mask Value") | |
| # ax4.pause(0.1) | |
| ### Fig6: Segmentation Mask (GT) ### | |
| if TAXABIND_TTA: | |
| ax = ax3 | |
| image = mpimg.imread(img_path_override) | |
| ax.imshow(image) | |
| ax.set_title("Ground Image") | |
| ax.axis("off") | |
| sound_id = sound_id_override if sound_id_override is not None else "-1" | |
| plt.suptitle('Targets Found: {}/{} Coverage ratio: {:.4g} Travel Dist: {:.4g} Info Gain: {:.4g}% \n ({}) \n (Sound ID: {})'.format(self.num_targets_found, \ | |
| len(self.target_positions), self.explored_rate, travel_dist, (100*self.info_gain/self.total_info), msk_name, | |
| sound_id)) | |
| plt.tight_layout() | |
| plt.savefig('{}/{}_{}_samples.png'.format(path, n, step, dpi=100)) | |
| # plt.show() | |
| frame = '{}/{}_{}_samples.png'.format(path, n, step) | |
| self.frame_files.append(frame) | |
| plt.close() | |
| #################### | |
| # ADDED: For app.py | |
| #################### | |
| def plot_heatmap(self, save_dir, step, travel_dist, robots_route=None): | |
| """Plot only the segmentation heatmap and save it as ``{step}.png`` in | |
| ``save_dir``. This lightweight helper is meant for asynchronous | |
| streaming in the Gradio demo when full `plot_env` is too heavy. | |
| Parameters | |
| ---------- | |
| save_dir : str | |
| Directory to save the generated PNG file. | |
| step : int | |
| Current timestep; becomes the filename ``{step}.png``. | |
| robots_route : list | None | |
| Optional list of routes (xPoints, yPoints) to overlay. | |
| Returns | |
| ------- | |
| str | |
| Full path to the generated PNG file. | |
| """ | |
| import os | |
| plt.switch_backend('agg') | |
| plt.cla() | |
| color_list = ["r", "g", "c", "m", "y", "k"] | |
| fig, ax = plt.subplots(1, 1, figsize=(6, 6)) | |
| # Select the mask to visualise | |
| if TAXABIND_TTA and USE_CLIP_PREDS: | |
| side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
| mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
| scale_y = math.ceil(self.ground_truth_size[1] / side_dim) | |
| scale_x = math.ceil(self.ground_truth_size[0] / side_dim) | |
| upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) | |
| upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] | |
| im = ax.imshow(upscaled_mask_viz, cmap="viridis") | |
| ax.axis("off") | |
| else: | |
| im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) | |
| ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
| # Optionally overlay robot paths | |
| if robots_route is not None: | |
| for i, route in enumerate(robots_route): | |
| robot_marker_color = color_list[i % len(color_list)] | |
| xPoints, yPoints = route | |
| ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
| ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
| ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
| # Plot target positions | |
| for target in self.target_positions: | |
| if self.coverage_belief[target[1], target[0]] == 255: | |
| # ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) | |
| ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
| else: | |
| # ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) | |
| ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
| # Sensor range | |
| rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
| rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
| max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
| min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
| max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
| min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
| ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
| ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
| # Color bar | |
| cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
| cbar.set_label("Normalized Probs") | |
| plt.suptitle('Targets Found: {}/{} Coverage ratio: {:.4g} Travel Dist: {:.4g}'.format(self.num_targets_found, \ | |
| len(self.target_positions), self.explored_rate, travel_dist)) | |
| os.makedirs(save_dir, exist_ok=True) | |
| out_path = os.path.join(save_dir, f"{step}.png") | |
| fig.savefig(out_path, dpi=100) | |
| plt.close(fig) | |
| return out_path | |
| #################### | |
| class CustomNorm(Normalize): | |
| """ | |
| A custom normalization that allocates a larger fraction of the colormap | |
| to the lower data range [vmin, mid] than to [mid, vmax]. | |
| Parameters | |
| ---------- | |
| vmin : float | |
| Minimum data value | |
| vmax : float | |
| Maximum data value | |
| mid : float | |
| Midpoint in data where we switch from 'lower' to 'upper' mapping | |
| lower_portion : float | |
| Fraction of the colormap to allocate for [vmin, mid]. | |
| For example, 0.8 => 80% of colors for [vmin, mid], 20% for [mid, vmax]. | |
| clip : bool | |
| Whether to clip data outside [vmin, vmax]. | |
| """ | |
| def __init__(self, vmin=None, vmax=None, mid=0.05, lower_portion=0.8, clip=False): | |
| self.mid = mid | |
| self.lower_portion = lower_portion | |
| super().__init__(vmin, vmax, clip) | |
| def __call__(self, value, clip=None): | |
| """Forward transform: data -> [0..1] color space.""" | |
| vmin, vmax, mid = self.vmin, self.vmax, self.mid | |
| lp = self.lower_portion | |
| value = np.asarray(value, dtype=np.float64) | |
| # Piecewise linear mapping: | |
| # [vmin..mid] => [0..lp] | |
| # [mid..vmax] => [lp..1] | |
| normed = np.where( | |
| value <= mid, | |
| lp * (value - vmin) / (mid - vmin), | |
| lp + (value - mid) / (vmax - mid) * (1 - lp) | |
| ) | |
| return np.clip(normed, 0, 1) | |
| def inverse(self, value): | |
| """ | |
| Inverse transform: [0..1] color space -> data space. | |
| Matplotlib's colorbar calls this to place ticks correctly. | |
| """ | |
| vmin, vmax, mid = self.vmin, self.vmax, self.mid | |
| lp = self.lower_portion | |
| value = np.asarray(value, dtype=np.float64) | |
| # For color space [0..lp], invert to [vmin..mid] | |
| # For color space [lp..1], invert to [mid..vmax] | |
| below = (value <= lp) | |
| above = ~below | |
| # Allocate array for results | |
| data = np.zeros_like(value, dtype=np.float64) | |
| # Invert lower segment | |
| data[below] = vmin + (value[below] / lp) * (mid - vmin) | |
| # Invert upper segment | |
| data[above] = mid + ((value[above] - lp) / (1 - lp)) * (vmax - mid) | |
| return data | |