import numpy as np import matplotlib.pyplot as plt class TrajectoryEvaluator: def __init__(self, progress_weight=5.0, ttc_weight=5.0, comfortable_weight=2.0, driving_direction_horizon=1.0, driving_direction_compliance_threshold=2.0, driving_direction_violation_threshold=6.0, stopped_speed_threshold=5e-03, progress_distance_threshold=0.1, lane_thres=2.0, eps=1e-6, time_step=0.1): self.progress_weight = progress_weight self.ttc_weight = ttc_weight self.comfortable_weight = comfortable_weight self.driving_direction_horizon = driving_direction_horizon self.driving_direction_compliance_threshold = driving_direction_compliance_threshold self.driving_direction_violation_threshold = driving_direction_violation_threshold self.stopped_speed_threshold = stopped_speed_threshold self.progress_distance_threshold = progress_distance_threshold self.lane_thres = lane_thres self.eps = eps self.time_step = time_step def calculate_distance(self, pos1, pos2): return np.linalg.norm(pos1 - pos2) def time_to_collision(self, v, v_lead, s): if v > v_lead: return s / (v - v_lead) else: return float('inf') def lazy_lane_keeping_evaluator(self, trajectory, lane_center_lines): num_frames = trajectory.shape[0] total_cost = 0 for point in trajectory: min_dist = float('inf') nearest_segment_dist = float('inf') for lane in lane_center_lines: for i in range(len(lane) - 1): segment_start = lane[i] segment_end = lane[i + 1] proj_point = self.project_point_to_segment(point, segment_start, segment_end) if self.is_point_on_segment(proj_point, segment_start, segment_end): dist = self.calculate_distance(point, proj_point) nearest_segment_dist = min(nearest_segment_dist, dist) min_dist = min(min_dist, dist) if nearest_segment_dist > self.lane_thres: total_cost += 1 / num_frames else: total_cost += (nearest_segment_dist / self.eps) ** 2 / num_frames if min_dist == float('inf'): total_cost += 1 return total_cost def project_point_to_segment(self, point, segment_start, segment_end): segment_vector = segment_end - segment_start point_vector = point - segment_start segment_length = np.dot(segment_vector, segment_vector) if segment_length == 0: return segment_start projection = np.dot(point_vector, segment_vector) / segment_length projection_point = segment_start + projection * segment_vector return projection_point def is_point_on_segment(self, point, segment_start, segment_end): cross_product = np.cross(segment_end - segment_start, point - segment_start) if np.abs(cross_product) > self.eps: return False dot_product = np.dot(point - segment_start, segment_end - segment_start) if dot_product < 0: return False squared_length_segment = np.dot(segment_end - segment_start, segment_end - segment_start) if dot_product > squared_length_segment: return False return True def evaluate_trajectories(self, trajectories, other_agents, lane_center_lines): num_trajectories = trajectories.shape[0] scores = np.zeros(num_trajectories) # Initialize sub-scores nc_scores = np.zeros(num_trajectories) dac_scores = np.zeros(num_trajectories) ddc_scores = np.zeros(num_trajectories) ttc_scores = np.zeros(num_trajectories) comfort_scores = np.zeros(num_trajectories) progress_scores = np.zeros(num_trajectories) lane_scores = np.zeros(num_trajectories) for i in range(num_trajectories): traj = trajectories[i] future_positions = traj[10:] # Future frames (30) initial_velocity = np.linalg.norm(traj[10] - traj[9]) / self.time_step progress = np.linalg.norm(future_positions[-1] - future_positions[0]) total_ttc = 0 total_comfort = 0 no_collision = True drivable_area_compliant = True driving_direction_compliant = True valid_ttc = True for t in range(1, future_positions.shape[0]): ego_position = future_positions[t] ego_velocity = np.linalg.norm(future_positions[t] - future_positions[t - 1]) / self.time_step closest_agent = None min_distance = float('inf') for agent in other_agents: agent_position = np.array([agent[0], agent[1], agent[2]]) distance = self.calculate_distance(ego_position, agent_position) if distance < min_distance: min_distance = distance closest_agent = agent if closest_agent is not None: s = min_distance - closest_agent[4] # Subtracting agent's half-width (assuming width is the second dimension, h) ttc = self.time_to_collision(ego_velocity, np.linalg.norm([closest_agent[6], closest_agent[7]]), s) if ttc < self.driving_direction_horizon: total_ttc += ttc valid_ttc = valid_ttc and (ttc > 0) if s < self.driving_direction_compliance_threshold: total_comfort += 1 elif s > self.driving_direction_violation_threshold: total_comfort -= 1 if min_distance < 0: no_collision = False # Check drivable area compliance and driving direction compliance if not (0 <= ego_position[0] <= 100 and 0 <= ego_position[1] <= 100): # Example area bounds drivable_area_compliant = False if not (-self.driving_direction_violation_threshold <= ego_position[1] <= self.driving_direction_violation_threshold): driving_direction_compliant = False lane_cost = self.lazy_lane_keeping_evaluator(future_positions, lane_center_lines) nc_scores[i] = 1 if no_collision else 0 dac_scores[i] = 1 if drivable_area_compliant else 0 ddc_scores[i] = 1 if driving_direction_compliant else (0.5 if driving_direction_compliant else 0) ttc_scores[i] = 1 if valid_ttc else 0 comfort_scores[i] = 1 if total_comfort >= 0 else 0 progress_scores[i] = progress / np.max(progress_scores) if np.max(progress_scores) > 0 else 1 lane_scores[i] = lane_cost scores[i] = ( self.progress_weight * progress_scores[i] + self.ttc_weight * ttc_scores[i] + self.comfortable_weight * comfort_scores[i] + lane_cost ) return scores, nc_scores, dac_scores, ddc_scores, ttc_scores, comfort_scores, progress_scores, lane_scores def example_usage(): ego_initial_position = [0.0, 0.0] ego_initial_velocity = 20.0 other_agents = [ [30.0, 0.0, 0.0, 2.0, 2.0, 5.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0], [50.0, 0.0, 0.0, 2.5, 2.5, 7.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0,]]