|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
|
|
class TrajectoryEvaluator: |
|
def __init__(self, |
|
progress_weight=5.0, |
|
ttc_weight=5.0, |
|
comfortable_weight=2.0, |
|
driving_direction_horizon=1.0, |
|
driving_direction_compliance_threshold=2.0, |
|
driving_direction_violation_threshold=6.0, |
|
stopped_speed_threshold=5e-03, |
|
progress_distance_threshold=0.1, |
|
lane_thres=2.0, |
|
eps=1e-6, |
|
time_step=0.1): |
|
|
|
self.progress_weight = progress_weight |
|
self.ttc_weight = ttc_weight |
|
self.comfortable_weight = comfortable_weight |
|
self.driving_direction_horizon = driving_direction_horizon |
|
self.driving_direction_compliance_threshold = driving_direction_compliance_threshold |
|
self.driving_direction_violation_threshold = driving_direction_violation_threshold |
|
self.stopped_speed_threshold = stopped_speed_threshold |
|
self.progress_distance_threshold = progress_distance_threshold |
|
self.lane_thres = lane_thres |
|
self.eps = eps |
|
self.time_step = time_step |
|
|
|
def calculate_distance(self, pos1, pos2): |
|
return np.linalg.norm(pos1 - pos2) |
|
|
|
def time_to_collision(self, v, v_lead, s): |
|
if v > v_lead: |
|
return s / (v - v_lead) |
|
else: |
|
return float('inf') |
|
|
|
def lazy_lane_keeping_evaluator(self, trajectory, lane_center_lines): |
|
num_frames = trajectory.shape[0] |
|
total_cost = 0 |
|
for point in trajectory: |
|
min_dist = float('inf') |
|
nearest_segment_dist = float('inf') |
|
for lane in lane_center_lines: |
|
for i in range(len(lane) - 1): |
|
segment_start = lane[i] |
|
segment_end = lane[i + 1] |
|
proj_point = self.project_point_to_segment(point, segment_start, segment_end) |
|
if self.is_point_on_segment(proj_point, segment_start, segment_end): |
|
dist = self.calculate_distance(point, proj_point) |
|
nearest_segment_dist = min(nearest_segment_dist, dist) |
|
min_dist = min(min_dist, dist) |
|
|
|
if nearest_segment_dist > self.lane_thres: |
|
total_cost += 1 / num_frames |
|
else: |
|
total_cost += (nearest_segment_dist / self.eps) ** 2 / num_frames |
|
|
|
if min_dist == float('inf'): |
|
total_cost += 1 |
|
|
|
return total_cost |
|
|
|
def project_point_to_segment(self, point, segment_start, segment_end): |
|
segment_vector = segment_end - segment_start |
|
point_vector = point - segment_start |
|
segment_length = np.dot(segment_vector, segment_vector) |
|
if segment_length == 0: |
|
return segment_start |
|
projection = np.dot(point_vector, segment_vector) / segment_length |
|
projection_point = segment_start + projection * segment_vector |
|
return projection_point |
|
|
|
def is_point_on_segment(self, point, segment_start, segment_end): |
|
cross_product = np.cross(segment_end - segment_start, point - segment_start) |
|
if np.abs(cross_product) > self.eps: |
|
return False |
|
dot_product = np.dot(point - segment_start, segment_end - segment_start) |
|
if dot_product < 0: |
|
return False |
|
squared_length_segment = np.dot(segment_end - segment_start, segment_end - segment_start) |
|
if dot_product > squared_length_segment: |
|
return False |
|
return True |
|
|
|
def evaluate_trajectories(self, trajectories, other_agents, lane_center_lines): |
|
num_trajectories = trajectories.shape[0] |
|
scores = np.zeros(num_trajectories) |
|
|
|
|
|
nc_scores = np.zeros(num_trajectories) |
|
dac_scores = np.zeros(num_trajectories) |
|
ddc_scores = np.zeros(num_trajectories) |
|
ttc_scores = np.zeros(num_trajectories) |
|
comfort_scores = np.zeros(num_trajectories) |
|
progress_scores = np.zeros(num_trajectories) |
|
lane_scores = np.zeros(num_trajectories) |
|
|
|
for i in range(num_trajectories): |
|
traj = trajectories[i] |
|
future_positions = traj[10:] |
|
initial_velocity = np.linalg.norm(traj[10] - traj[9]) / self.time_step |
|
|
|
progress = np.linalg.norm(future_positions[-1] - future_positions[0]) |
|
total_ttc = 0 |
|
total_comfort = 0 |
|
|
|
no_collision = True |
|
drivable_area_compliant = True |
|
driving_direction_compliant = True |
|
valid_ttc = True |
|
|
|
for t in range(1, future_positions.shape[0]): |
|
ego_position = future_positions[t] |
|
ego_velocity = np.linalg.norm(future_positions[t] - future_positions[t - 1]) / self.time_step |
|
|
|
closest_agent = None |
|
min_distance = float('inf') |
|
|
|
for agent in other_agents: |
|
agent_position = np.array([agent[0], agent[1], agent[2]]) |
|
distance = self.calculate_distance(ego_position, agent_position) |
|
if distance < min_distance: |
|
min_distance = distance |
|
closest_agent = agent |
|
|
|
if closest_agent is not None: |
|
s = min_distance - closest_agent[4] |
|
ttc = self.time_to_collision(ego_velocity, np.linalg.norm([closest_agent[6], closest_agent[7]]), s) |
|
if ttc < self.driving_direction_horizon: |
|
total_ttc += ttc |
|
valid_ttc = valid_ttc and (ttc > 0) |
|
|
|
if s < self.driving_direction_compliance_threshold: |
|
total_comfort += 1 |
|
elif s > self.driving_direction_violation_threshold: |
|
total_comfort -= 1 |
|
|
|
if min_distance < 0: |
|
no_collision = False |
|
|
|
|
|
if not (0 <= ego_position[0] <= 100 and 0 <= ego_position[1] <= 100): |
|
drivable_area_compliant = False |
|
if not (-self.driving_direction_violation_threshold <= ego_position[1] <= self.driving_direction_violation_threshold): |
|
driving_direction_compliant = False |
|
|
|
lane_cost = self.lazy_lane_keeping_evaluator(future_positions, lane_center_lines) |
|
|
|
nc_scores[i] = 1 if no_collision else 0 |
|
dac_scores[i] = 1 if drivable_area_compliant else 0 |
|
ddc_scores[i] = 1 if driving_direction_compliant else (0.5 if driving_direction_compliant else 0) |
|
ttc_scores[i] = 1 if valid_ttc else 0 |
|
comfort_scores[i] = 1 if total_comfort >= 0 else 0 |
|
progress_scores[i] = progress / np.max(progress_scores) if np.max(progress_scores) > 0 else 1 |
|
lane_scores[i] = lane_cost |
|
|
|
scores[i] = ( |
|
self.progress_weight * progress_scores[i] + |
|
self.ttc_weight * ttc_scores[i] + |
|
self.comfortable_weight * comfort_scores[i] + |
|
lane_cost |
|
) |
|
|
|
return scores, nc_scores, dac_scores, ddc_scores, ttc_scores, comfort_scores, progress_scores, lane_scores |
|
|
|
def example_usage(): |
|
ego_initial_position = [0.0, 0.0] |
|
ego_initial_velocity = 20.0 |
|
other_agents = [ |
|
[30.0, 0.0, 0.0, 2.0, 2.0, 5.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0], |
|
[50.0, 0.0, 0.0, 2.5, 2.5, 7.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0,]] |
|
|