File size: 5,689 Bytes
da2e2ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from typing import Dict, List, Optional
import numpy as np
import numpy.typing as npt


from nuplan.common.actor_state.tracked_objects_types import (
    TrackedObjectType,
    AGENT_TYPES,
)

from nuplan.common.actor_state.agent import Agent
from nuplan.common.actor_state.static_object import StaticObject

from nuplan.common.actor_state.oriented_box import OrientedBox
from nuplan.common.actor_state.scene_object import SceneObjectMetadata
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.common.actor_state.state_representation import StateSE2, StateVector2D
from nuplan.common.actor_state.tracked_objects import TrackedObjects, TrackedObject
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks
from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling

from navsim.common.dataclasses import Annotations

# TODO: Refactor this file

# TODO: should be available somewhere in the nuplan-devkit
tracked_object_types: Dict[str, TrackedObjectType] = {
    "vehicle": TrackedObjectType.VEHICLE,
    "pedestrian": TrackedObjectType.PEDESTRIAN,
    "bicycle": TrackedObjectType.BICYCLE,
    "traffic_cone": TrackedObjectType.TRAFFIC_CONE,
    "barrier": TrackedObjectType.BARRIER,
    "czone_sign": TrackedObjectType.CZONE_SIGN,
    "generic_object": TrackedObjectType.GENERIC_OBJECT,
    "ego": TrackedObjectType.EGO,
}


def normalize_angle(angle):
    """

    Map a angle in range [-π, π]

    :param angle: any angle as float

    :return: normalized angle

    """
    return np.arctan2(np.sin(angle), np.cos(angle))


def annotations_to_detection_tracks(annotations: Annotations, ego_state: EgoState):

    detection_tracks: List[TrackedObject] = []

    time_point = ego_state.time_point
    track_boxes = gt_boxes_oriented_box(annotations.boxes, ego_state)

    for track_idx, track_box in enumerate(track_boxes):
        track_type = tracked_object_types[annotations.names[track_idx]]
        track_metadata = SceneObjectMetadata(
            time_point.time_us,
            token=annotations.instance_tokens[track_idx],
            track_id=None,
            track_token=annotations.track_tokens[track_idx],
        )

        if track_type in AGENT_TYPES:
            vx, vy = (
                annotations.velocity_3d[track_idx][0],
                annotations.velocity_3d[track_idx][1],
            )
            velocity = StateVector2D(vx, vy)

            detection_track = Agent(
                tracked_object_type=track_type,
                oriented_box=track_box,
                velocity=rotate_vector(velocity, ego_state.rear_axle.heading),
                metadata=track_metadata,
            )
        else:
            detection_track = StaticObject(
                tracked_object_type=track_type,
                oriented_box=track_box,
                metadata=track_metadata,
            )

        detection_tracks.append(detection_track)

    return DetectionsTracks(TrackedObjects(detection_tracks))


def gt_boxes_oriented_box(

    gt_boxes: List[npt.NDArray[np.float32]], ego_state: EgoState

) -> List[OrientedBox]:

    oriented_boxes: List[OrientedBox] = []
    for gt_box in gt_boxes:
        # gt_box = (x, y, z, length, width, height, yaw) TODO: add intenum
        local_box_x, local_box_y, local_box_heading = gt_box[0], gt_box[1], gt_box[-1]
        local_box_se2 = rotate_state_se2(
            StateSE2(local_box_x, local_box_y, local_box_heading),
            angle=ego_state.rear_axle.heading,
        )

        global_box_x, global_box_y, global_box_heading = (
            local_box_se2.x + ego_state.rear_axle.x,
            local_box_se2.y + ego_state.rear_axle.y,
            normalize_angle(local_box_se2.heading),
        )
        box_length, box_width, box_height = gt_box[3], gt_box[4], gt_box[5]
        oriented_box = OrientedBox(
            StateSE2(global_box_x, global_box_y, global_box_heading),
            box_length,
            box_width,
            box_height,
        )
        oriented_boxes.append(oriented_box)

    return oriented_boxes


def rotate_state_se2(state_se2: StateSE2, angle: float = np.deg2rad(0)) -> StateSE2:

    sin, cos = np.sin(angle), np.cos(angle)
    x_rotated = state_se2.x * cos - state_se2.y * sin
    y_rotated = state_se2.x * sin + state_se2.y * cos
    heading_rotated = normalize_angle(state_se2.heading + angle)

    return StateSE2(x_rotated, y_rotated, heading_rotated)


def rotate_vector(vector: StateVector2D, angle: float) -> StateVector2D:
    sin, cos = np.sin(angle), np.cos(angle)
    x_rotated = vector.x * cos - vector.y * sin
    y_rotated = vector.x * sin + vector.y * cos
    return StateVector2D(x_rotated, y_rotated)


def sample_future_indices(

    future_sampling: TrajectorySampling,

    iteration: int,

    time_horizon: float,

    num_samples: Optional[int],

) -> List[int]:
    time_interval = future_sampling.interval_length
    if time_horizon <= 0.0 or time_interval <= 0.0 or time_horizon < time_interval:
        raise ValueError(
            f"Time horizon {time_horizon} must be greater or equal than target time interval {time_interval}"
            " and both must be positive."
        )

    num_samples = num_samples if num_samples else int(time_horizon / time_interval)

    num_intervals = int(time_horizon / time_interval) + 1
    step_size = num_intervals // num_samples
    try:
        time_idcs = np.arange(iteration, num_intervals, step_size)
    except:
        assert None

    return list(time_idcs)