lkllkl's picture
Upload folder using huggingface_hub
da2e2ac verified
raw
history blame
7.85 kB
import numpy as np
import matplotlib.pyplot as plt
class TrajectoryEvaluator:
def __init__(self,
progress_weight=5.0,
ttc_weight=5.0,
comfortable_weight=2.0,
driving_direction_horizon=1.0,
driving_direction_compliance_threshold=2.0,
driving_direction_violation_threshold=6.0,
stopped_speed_threshold=5e-03,
progress_distance_threshold=0.1,
lane_thres=2.0,
eps=1e-6,
time_step=0.1):
self.progress_weight = progress_weight
self.ttc_weight = ttc_weight
self.comfortable_weight = comfortable_weight
self.driving_direction_horizon = driving_direction_horizon
self.driving_direction_compliance_threshold = driving_direction_compliance_threshold
self.driving_direction_violation_threshold = driving_direction_violation_threshold
self.stopped_speed_threshold = stopped_speed_threshold
self.progress_distance_threshold = progress_distance_threshold
self.lane_thres = lane_thres
self.eps = eps
self.time_step = time_step
def calculate_distance(self, pos1, pos2):
return np.linalg.norm(pos1 - pos2)
def time_to_collision(self, v, v_lead, s):
if v > v_lead:
return s / (v - v_lead)
else:
return float('inf')
def lazy_lane_keeping_evaluator(self, trajectory, lane_center_lines):
num_frames = trajectory.shape[0]
total_cost = 0
for point in trajectory:
min_dist = float('inf')
nearest_segment_dist = float('inf')
for lane in lane_center_lines:
for i in range(len(lane) - 1):
segment_start = lane[i]
segment_end = lane[i + 1]
proj_point = self.project_point_to_segment(point, segment_start, segment_end)
if self.is_point_on_segment(proj_point, segment_start, segment_end):
dist = self.calculate_distance(point, proj_point)
nearest_segment_dist = min(nearest_segment_dist, dist)
min_dist = min(min_dist, dist)
if nearest_segment_dist > self.lane_thres:
total_cost += 1 / num_frames
else:
total_cost += (nearest_segment_dist / self.eps) ** 2 / num_frames
if min_dist == float('inf'):
total_cost += 1
return total_cost
def project_point_to_segment(self, point, segment_start, segment_end):
segment_vector = segment_end - segment_start
point_vector = point - segment_start
segment_length = np.dot(segment_vector, segment_vector)
if segment_length == 0:
return segment_start
projection = np.dot(point_vector, segment_vector) / segment_length
projection_point = segment_start + projection * segment_vector
return projection_point
def is_point_on_segment(self, point, segment_start, segment_end):
cross_product = np.cross(segment_end - segment_start, point - segment_start)
if np.abs(cross_product) > self.eps:
return False
dot_product = np.dot(point - segment_start, segment_end - segment_start)
if dot_product < 0:
return False
squared_length_segment = np.dot(segment_end - segment_start, segment_end - segment_start)
if dot_product > squared_length_segment:
return False
return True
def evaluate_trajectories(self, trajectories, other_agents, lane_center_lines):
num_trajectories = trajectories.shape[0]
scores = np.zeros(num_trajectories)
# Initialize sub-scores
nc_scores = np.zeros(num_trajectories)
dac_scores = np.zeros(num_trajectories)
ddc_scores = np.zeros(num_trajectories)
ttc_scores = np.zeros(num_trajectories)
comfort_scores = np.zeros(num_trajectories)
progress_scores = np.zeros(num_trajectories)
lane_scores = np.zeros(num_trajectories)
for i in range(num_trajectories):
traj = trajectories[i]
future_positions = traj[10:] # Future frames (30)
initial_velocity = np.linalg.norm(traj[10] - traj[9]) / self.time_step
progress = np.linalg.norm(future_positions[-1] - future_positions[0])
total_ttc = 0
total_comfort = 0
no_collision = True
drivable_area_compliant = True
driving_direction_compliant = True
valid_ttc = True
for t in range(1, future_positions.shape[0]):
ego_position = future_positions[t]
ego_velocity = np.linalg.norm(future_positions[t] - future_positions[t - 1]) / self.time_step
closest_agent = None
min_distance = float('inf')
for agent in other_agents:
agent_position = np.array([agent[0], agent[1], agent[2]])
distance = self.calculate_distance(ego_position, agent_position)
if distance < min_distance:
min_distance = distance
closest_agent = agent
if closest_agent is not None:
s = min_distance - closest_agent[4] # Subtracting agent's half-width (assuming width is the second dimension, h)
ttc = self.time_to_collision(ego_velocity, np.linalg.norm([closest_agent[6], closest_agent[7]]), s)
if ttc < self.driving_direction_horizon:
total_ttc += ttc
valid_ttc = valid_ttc and (ttc > 0)
if s < self.driving_direction_compliance_threshold:
total_comfort += 1
elif s > self.driving_direction_violation_threshold:
total_comfort -= 1
if min_distance < 0:
no_collision = False
# Check drivable area compliance and driving direction compliance
if not (0 <= ego_position[0] <= 100 and 0 <= ego_position[1] <= 100): # Example area bounds
drivable_area_compliant = False
if not (-self.driving_direction_violation_threshold <= ego_position[1] <= self.driving_direction_violation_threshold):
driving_direction_compliant = False
lane_cost = self.lazy_lane_keeping_evaluator(future_positions, lane_center_lines)
nc_scores[i] = 1 if no_collision else 0
dac_scores[i] = 1 if drivable_area_compliant else 0
ddc_scores[i] = 1 if driving_direction_compliant else (0.5 if driving_direction_compliant else 0)
ttc_scores[i] = 1 if valid_ttc else 0
comfort_scores[i] = 1 if total_comfort >= 0 else 0
progress_scores[i] = progress / np.max(progress_scores) if np.max(progress_scores) > 0 else 1
lane_scores[i] = lane_cost
scores[i] = (
self.progress_weight * progress_scores[i] +
self.ttc_weight * ttc_scores[i] +
self.comfortable_weight * comfort_scores[i] +
lane_cost
)
return scores, nc_scores, dac_scores, ddc_scores, ttc_scores, comfort_scores, progress_scores, lane_scores
def example_usage():
ego_initial_position = [0.0, 0.0]
ego_initial_velocity = 20.0
other_agents = [
[30.0, 0.0, 0.0, 2.0, 2.0, 5.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0],
[50.0, 0.0, 0.0, 2.5, 2.5, 7.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0,]]