|
"""Reachy API Module. |
|
|
|
This module provides an API for interacting with a Reachy robot. |
|
It supports both real and fake robot modes, with comprehensive error handling and logging. |
|
""" |
|
|
|
import asyncio |
|
import logging |
|
import os |
|
|
|
import numpy as np |
|
import numpy.typing as npt |
|
from draw_input_schema import DrawInputSchema, Point |
|
|
|
try: |
|
from reachy2_sdk import ReachySDK |
|
from reachy2_sdk.utils.utils import get_pose_matrix |
|
from reachy2_emotions.replay_move import EmotionPlayer |
|
REACHY_SDK_MISSING = False |
|
except ImportError: |
|
class ReachySDK: |
|
pass |
|
class EmotionPlayer: |
|
pass |
|
REACHY_SDK_MISSING = True |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ReachyAPI: |
|
"""API for interacting with a Reachy robot. |
|
|
|
This class provides methods to control various aspects of the Reachy robot, |
|
including movement, and behaviors. It can operate in both real |
|
and fake robot modes. |
|
""" |
|
|
|
fake_robot: bool |
|
robot_ip: str |
|
emotion_recordings_folder: str |
|
async_loop: asyncio.BaseEventLoop |
|
|
|
reachy: ReachySDK |
|
emotion_player: EmotionPlayer |
|
emotion_names: list[str] |
|
is_drawing: bool |
|
drawn_points: list[Point] |
|
connected: bool |
|
|
|
anim_names: list[str] |
|
|
|
def __init__(self, |
|
fake_robot: bool, |
|
emotion_recordings_folder: str, |
|
robot_ip: str = "localhost") -> None: |
|
"""Initialize the ReachyAPI instance.""" |
|
|
|
self.fake_robot = fake_robot |
|
self.robot_ip = robot_ip |
|
self.emotion_recordings_folder = emotion_recordings_folder |
|
self.async_loop = None |
|
|
|
self.reachy = None |
|
self.emotion_player = None |
|
self.emotion_names = None |
|
self.is_drawing = False |
|
self.drawn_points = [] |
|
|
|
self.connected = False |
|
|
|
|
|
|
|
async def connect(self) -> bool: |
|
"""Connect to the Reachy robot. |
|
|
|
Returns: |
|
bool: True if connection successful, False otherwise |
|
""" |
|
logger.debug("Connecting to Reachy") |
|
self.connected = False |
|
self.async_loop = asyncio.get_event_loop() |
|
|
|
if self.fake_robot: |
|
logger.info("Using a fake robot") |
|
self._retrieve_fake_emotions() |
|
return True |
|
|
|
if REACHY_SDK_MISSING: |
|
logger.error("Could not import reachy2_sdk, will act as if reachy is in fake robot mode") |
|
self.fake_robot = True |
|
self._retrieve_fake_emotions() |
|
return True |
|
|
|
self.reachy = ReachySDK(host=self.robot_ip) |
|
self.emotion_player = EmotionPlayer(ip=self.robot_ip, |
|
audio_device=None, |
|
audio_offset=0.0, |
|
record_folder=self.emotion_recordings_folder, |
|
auto_start=True) |
|
self._retrieve_available_emotions() |
|
|
|
if not self.reachy.is_connected: |
|
logger.error("Reachy is not connected.") |
|
return False |
|
|
|
turned_on = await asyncio.to_thread(self.reachy.turn_on) |
|
if not turned_on: |
|
logger.error("Could not turn on Reachy") |
|
return False |
|
|
|
self.anim_names = list() |
|
|
|
self.connected = True |
|
logger.debug("Connected to Reachy") |
|
return True |
|
|
|
def _retrieve_available_emotions(self) -> None: |
|
""" |
|
List all available emotions based on the JSON files in the folder. |
|
The emotion name is the filename without the .json extension. |
|
""" |
|
if not os.path.exists(self.emotion_recordings_folder): |
|
logging.error("Emotion recordings folder %s does not exist.", self.emotion_recordings_folder) |
|
self.emotion_names = [] |
|
return |
|
|
|
emotions = [] |
|
for file in os.listdir(self.emotion_recordings_folder): |
|
if file.endswith(".json"): |
|
emotion = os.path.splitext(file)[0] |
|
emotions.append(emotion) |
|
self.emotion_names = sorted(emotions) |
|
|
|
def _retrieve_fake_emotions(self) -> None: |
|
""" |
|
Retrieve fake emotions. |
|
""" |
|
self.emotion_names = ["affection1", "amazed1", "anxiety1", "attentive1", "attentive2", "boredom1", "boredom2", "calming1", "cheerful1", "come1", "confused1", "contempt1", "curious1", "dance1", "dance2", "dance3", "disgusted1", "displeased1", "displeased2", "downcast1", "enthusiastic1", "enthusiastic2", "exhausted1", "fear1", "frustrated1", "furious1", "go_away1", "grateful1", "helpful1", "helpful2", "impatient1", "impatient2", "incomprehensible2", "indifferent1", "inquiring1", "inquiring2", "inquiring3", "irritated1", "irritated2", "laughing1", "laughing2", "listening1", "listening2", "lonely1", "lost1", "loving1", "macarena1", "mime1", "no1", "no_excited1", "no_sad1", "oops1", "oops2", "patient1", "proud1", "proud2", "proud3", "rage1", "relief1", "relief2", "reprimand1", "reprimand2", "reprimand3", "resigned1", "sad1", "sad2", "scared1", "serenity1", "shy1", "sleep1", "success1", "success2", "surprised1", "surprised2", "thoughtful1", "thoughtful2", "tired1", "uncertain1", "uncomfortable1", "understanding1", "understanding2", "welcoming1", "welcoming2", "yes1", "yes_excited1", "yes_sad1"] |
|
|
|
async def disconnect(self) -> None: |
|
"""Disconnect from the Reachy robot.""" |
|
logger.debug("Disconnecting from Reachy") |
|
await asyncio.to_thread(self.reachy.turn_off) |
|
self.connected = False |
|
logger.debug("Disconnected from Reachy") |
|
|
|
def _check_connection_for_return(self, function_name: str) -> tuple[bool, bool]: |
|
if self.fake_robot: |
|
return (True, True) |
|
if not self.connected: |
|
logger.error(f"{function_name} failed because not connected to Reachy") |
|
return (True, False) |
|
return (False, False) |
|
|
|
|
|
|
|
async def reset_arm(self, right_or_left: str, duration: float = 1.0) -> bool: |
|
"""Reset Reachy's arm to its default position. |
|
|
|
Args: |
|
right_or_left: The arm to reset. Can be "right" or "left". |
|
|
|
Returns: |
|
bool: True if the arm reset successfully, False otherwise |
|
""" |
|
|
|
logger.debug("Resetting %s arm", right_or_left) |
|
|
|
should_return, result = self._check_connection_for_return("reset_arm") |
|
if should_return: |
|
return result |
|
|
|
if right_or_left == "right": |
|
await asyncio.to_thread(self.reachy.r_arm.gripper.close) |
|
return await self.move_arm("right", [0.03, -0.36, -0.63], [0.0, 0.0, 0.0], duration) |
|
elif right_or_left == "left": |
|
await asyncio.to_thread(self.reachy.l_arm.gripper.close) |
|
return await self.move_arm("left", [0.03, 0.36, -0.63], [0.0, 0.0, 0.0], duration) |
|
|
|
async def close_gripper(self, right_or_left: str) -> bool: |
|
"""Close the gripper of the Reachy robot. |
|
|
|
Args: |
|
right_or_left: The arm to close the gripper. Can be "right" or "left". |
|
|
|
Returns: |
|
bool: True if the gripper closed successfully, False otherwise |
|
""" |
|
logger.debug("Closing gripper of %s arm", right_or_left) |
|
|
|
should_return, result = self._check_connection_for_return("close_gripper") |
|
if should_return: |
|
return result |
|
|
|
if right_or_left == "right": |
|
await asyncio.to_thread(self.reachy.r_arm.gripper.close) |
|
elif right_or_left == "left": |
|
await asyncio.to_thread(self.reachy.l_arm.gripper.close) |
|
return True |
|
|
|
|
|
async def move_arm(self, right_or_left: str, |
|
target_pos: list[float], |
|
target_rot: list[float], |
|
duration: float) -> bool: |
|
"""Move Reachy's right arm to a specific point. |
|
|
|
Args: |
|
target_pose: A tuple containing the x, y and z coordinates of a point. |
|
|
|
Returns: |
|
bool: True if the arm moved successfully, False otherwise |
|
""" |
|
logger.debug("Moving %s arm to pose %s with rotation %s", right_or_left, target_pos, target_rot) |
|
|
|
should_return, result = self._check_connection_for_return("move_arm") |
|
if should_return: |
|
await asyncio.sleep(duration + 0.2) |
|
return result |
|
|
|
return await asyncio.to_thread(self._move_arm_blocking, |
|
right_or_left, |
|
target_pos, target_rot, |
|
duration) |
|
|
|
def _move_arm_blocking(self, right_or_left: str, |
|
target_pos: list[float], |
|
target_rot: list[float], |
|
duration: float) -> bool: |
|
arm = self.reachy.r_arm if right_or_left == "right" else self.reachy.l_arm |
|
|
|
if arm is None: |
|
logger.error("Could not retrieve arm") |
|
return False |
|
|
|
try: |
|
target_pose = self._build_pose_matrix(target_pos, target_rot) |
|
ik = arm.inverse_kinematics(target_pose) |
|
arm.goto(ik, duration=duration, degrees=True, wait=True) |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error moving arm: {e}") |
|
return False |
|
|
|
def _build_pose_matrix(self, tuple_pos: list[float], tuple_rot: list[float]) -> npt.NDArray[np.float64]: |
|
"""Build a 4x4 pose matrix for a given position in 3D space |
|
|
|
Args: |
|
tuple_pos: A tuple containing the x, y and z coordinates of a point. |
|
tuple_rot: A tuple containing the roll, pitch and yaw angles. |
|
|
|
Returns: |
|
A 4x4 NumPy array representing the pose matrix. |
|
""" |
|
return get_pose_matrix(tuple_pos, tuple_rot) |
|
|
|
|
|
def get_emotion_names(self) -> list[str]: |
|
"""Get the list of available emotions on the Reachy robot. |
|
|
|
Returns: |
|
list[str]: A list of emotion names. |
|
""" |
|
return self.emotion_names |
|
|
|
async def play_emotion(self, emotion_name: str) -> bool: |
|
"""Make Reachy play an emotion. |
|
|
|
Args: |
|
emotion_name: The name of the emotion to play. |
|
|
|
Returns: |
|
bool: True if the emotion played successfully, False otherwise |
|
""" |
|
logger.debug("Playing emotion %s", emotion_name) |
|
|
|
should_return, result = self._check_connection_for_return("play_emotion") |
|
if should_return: |
|
return result |
|
|
|
status = await asyncio.to_thread(self._play_emotion_blocking, emotion_name) |
|
reset_left = asyncio.create_task(self.reset_arm("left")) |
|
reset_right = asyncio.create_task(self.reset_arm("right")) |
|
await asyncio.gather(reset_left, reset_right) |
|
|
|
return status |
|
|
|
def _play_emotion_blocking(self, emotion_name: str) -> bool: |
|
self.emotion_player.play_emotion(emotion_name) |
|
if self.emotion_player.thread: |
|
self.emotion_player.thread.join() |
|
if self.emotion_player.idle_thread: |
|
self.emotion_player.idle_stop_event.set() |
|
self.emotion_player.idle_thread.join() |
|
self.emotion_player.idle_stop_event.clear() |
|
return True |
|
|
|
async def stop_current_emotion(self) -> bool: |
|
"""Stop the current emotion playing on Reachy. |
|
|
|
Returns: |
|
bool: True if the emotion stopped successfully, False otherwise |
|
""" |
|
logger.debug("Stopping current emotion") |
|
|
|
should_return, result = self._check_connection_for_return("stop_current_emotion") |
|
if should_return: |
|
return result |
|
|
|
return await asyncio.to_thread(self._stop_current_emotion_blocking) |
|
|
|
def _stop_current_emotion_blocking(self) -> bool: |
|
with self.emotion_player.lock: |
|
self.emotion_player.stop() |
|
if self.emotion_player.thread: |
|
self.emotion_player.thread.join() |
|
if self.emotion_player.idle_thread: |
|
self.emotion_player.idle_stop_event.set() |
|
self.emotion_player.idle_thread.join() |
|
self.emotion_player.idle_stop_event.clear() |
|
self.emotion_player.stop_event.clear() |
|
return True |
|
|
|
async def draw(self, |
|
max_x: float, max_y: float, |
|
draw_input: DrawInputSchema, |
|
duration_between_points: float = 0.2) -> bool: |
|
"""Make Reachy draw points on a canvas. |
|
|
|
Args: |
|
max_x: The maximum x coordinate of the drawing. |
|
max_y: The maximum y coordinate of the drawing. |
|
draw_input: A list of points using the DrawInputSchema. |
|
duration_between_points: The duration of the movement between 2 points. |
|
|
|
Returns: |
|
bool: True if the drawing was successful, False otherwise |
|
""" |
|
self.is_drawing = True |
|
self.drawn_points = [] |
|
await self.reset_arm("right") |
|
await self.close_gripper("right") |
|
|
|
first_point = True |
|
for point in draw_input.points: |
|
if not self.is_drawing: |
|
break |
|
reachy_pos, reachy_rot = self._from_drawing_point_to_right_arm_position(max_x, max_y, point) |
|
await self.move_arm("right", reachy_pos, reachy_rot, 1 if first_point else duration_between_points) |
|
if first_point: |
|
first_point = False |
|
self.drawn_points.append(point) |
|
|
|
|
|
if self.is_drawing: |
|
reachy_pos, reachy_rot = self._from_drawing_point_to_right_arm_position(max_x, max_y, self.drawn_points[0]) |
|
await self.move_arm("right", reachy_pos, reachy_rot, duration_between_points) |
|
self.drawn_points.append(point) |
|
|
|
await self.reset_arm("right") |
|
|
|
result = self.is_drawing |
|
self.is_drawing = False |
|
return result |
|
|
|
def stop_drawing(self) -> bool: |
|
"""Stop the drawing. |
|
|
|
Returns: |
|
bool: True if the drawing was stopped successfully, False otherwise |
|
""" |
|
self.is_drawing = False |
|
return True |
|
|
|
def get_drawn_positions(self) -> list[list[float]]: |
|
"""Get the positions of the points drawn on the canvas. |
|
|
|
Returns: |
|
list[list[float]]: A list of positions. |
|
""" |
|
return self.drawn_points.copy() |
|
|
|
def _from_drawing_point_to_right_arm_position(self, |
|
max_x: float, max_y: float, |
|
point: Point) -> tuple[list[float], list[float]]: |
|
"""Convert a drawing point to a right arm position. |
|
|
|
Args: |
|
max_x: The maximum x coordinate of the drawing. |
|
max_y: The maximum y coordinate of the drawing. |
|
point: a point using the DrawInputSchema. |
|
|
|
Returns: |
|
A tuple containing the x, y and z coordinates of a point and the roll, pitch and yaw angles. |
|
""" |
|
drawing_plane_x = 0.5 |
|
|
|
max_z_reachy = - 0.25 |
|
min_z_reachy = 0.0 |
|
max_y_reachy = -0.25 |
|
min_y_reachy = -0.5 |
|
|
|
y_reachy = point.x / max_y * (max_y_reachy - min_y_reachy) + min_y_reachy |
|
z_reachy = point.y / max_x * (max_z_reachy - min_z_reachy) + min_z_reachy |
|
|
|
return [drawing_plane_x, y_reachy, z_reachy], [0.0, -90.0, 0.0] |
|
|
|
|
|
|