基于Moveit2的人形机器人双臂规划4:封装服务与动作

Posted by banwf on 2025-12-21
Estimated Reading Time 7 Minutes
Words 1.5k In Total
Viewed Times

基于Moveit2的人形机器人双臂规划4:封装服务与动作

在完成对上文提到的接口的封装之后,我们进一步把这些功能封装成动作和服务,这样就可以在python中调用这些功能,把moveit和视觉功能结合,虽然moveit本身有python接口,但是在我开始这个项目的时候moveit2的python接口还不完善,直接安装不带python接口,源码编译还有一堆问题,所以采用了自己封装的方法。
此外,从这篇文章开始需要读者具备ROS2基础,了解服务、动作、回调等概念,并且熟悉C++和python编程,能够自行创建功能包,编写ROS2节点,否则是没法直接通过复制本文的代码来实现一个可用的ROS2服务或动作。

  • 这里有一个问题,就是动作服务器节点在多次调用后会丢包或者死掉,一直没有解决,换成服务暂时还没有出现这个问题,到底是这一个node里启动了太多action还是什么情况?给action回调分到不同的回调组、添加多线程,都没有解决这个问题

    1.定义动作

    创建一个功能包,创建动作文件.action,比如说MoveP.action,一个action类型定义由Goal、Result、Feedback三个部分组成,Goal是动作需要的参数,Result是返回的结果,Feedback是执行动作中可以发送的反馈,比如实时反馈动作做到哪一步。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # MoveP.action
    # Goal
    geometry_msgs/PoseStamped target_pose
    string lor
    string to_frame
    string reference_frame
    string planner

    ---
    # Result
    bool success

    ---
    # Feedback
    string feedback_message
    在CMakeLists中添加以下内容,注意需要引用你需要用到的消息类型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    cmake_minimum_required(VERSION 3.8)
    project(control_interfaces)

    if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
    add_compile_options(-Wall -Wextra -Wpedantic)
    endif()

    # find dependencies
    find_package(ament_cmake REQUIRED)
    find_package(rosidl_default_generators REQUIRED)
    find_package(rosidl_generator_dds_idl REQUIRED)
    find_package(action_msgs REQUIRED)
    find_package(geometry_msgs REQUIRED)
    find_package(sensor_msgs REQUIRED)
    find_package(trajectory_msgs REQUIRED)
    set(
    DEPENDENCY_LIST
    action_msgs
    geometry_msgs
    sensor_msgs
    trajectory_msgs
    )
    rosidl_generate_interfaces(${PROJECT_NAME}
    "action/MoveP.action"
    DEPENDENCIES ${DEPENDENCY_LIST}
    )

    ament_package()

    2.创建动作服务端

    继承Node类创建一个新的节点类,创建动作服务端,类型为上面定义后编译出来的动作类型,其中actionoptions和callback_group这两个参数是用于多线程,可以不填,这里使用他们的原因后面再说,”move_p”是动作名称,可以自定义,之后调用就用这个名称。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    rclcpp_action::Server<control_interfaces::action::MoveP>::SharedPtr move_p_action_server_;

    this->callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
    auto action_options = rcl_action_server_get_default_options();
    move_p_action_server_ = rclcpp_action::create_server<control_interfaces::action::MoveP>(
    this,
    "move_p",
    std::bind(&H1Gripper::handle_move_p_goal, this, std::placeholders::_1, std::placeholders::_2),
    std::bind(&H1Gripper::handle_move_p_cancel, this, std::placeholders::_1),
    std::bind(&H1Gripper::handle_move_p_accepted, this, std::placeholders::_1),
    action_options,
    callback_group_
    );
    分别定义处理goal、cancel、accepted的回调函数,具体执行逻辑在accepted的回调函数中执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    rclcpp_action::GoalResponse H1Gripper::handle_move_p_goal(
    const rclcpp_action::GoalUUID & uuid,
    std::shared_ptr<const control_interfaces::action::MoveP::Goal> goal)
    {
    RCLCPP_INFO(this->get_logger(), "Received MoveP goal request.");
    return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
    }

    // 处理 MoveP 的取消请求
    rclcpp_action::CancelResponse H1Gripper::handle_move_p_cancel(
    const std::shared_ptr<rclcpp_action::ServerGoalHandle<control_interfaces::action::MoveP>> goal_handle)
    {
    RCLCPP_INFO(this->get_logger(), "Cancel MoveP goal request.");
    return rclcpp_action::CancelResponse::ACCEPT;
    }

    // 处理 MoveP 的接受请求并执行
    void H1Gripper::handle_move_p_accepted(
    const std::shared_ptr<rclcpp_action::ServerGoalHandle<control_interfaces::action::MoveP>> goal_handle)
    {
    auto result = std::make_shared<control_interfaces::action::MoveP::Result>();

    // 获取目标姿态并调用单手动作函数
    const auto & goal = goal_handle->get_goal();
    geometry_msgs::msg::PoseStamped target_pose = goal->target_pose;

    // 调用单手动作函数 single_move_p
    // 调用关节运动控制函数
    try{
    bool success = single_move_p(goal_handle->get_goal()->lor, target_pose, goal_handle->get_goal()->to_frame, goal_handle->get_goal()->reference_frame, goal_handle->get_goal()->planner);

    // 在主线程中处理成功或失败的结果

    result->success = success;
    if (success) {
    goal_handle->succeed(result); // 执行成功
    RCLCPP_INFO(this->get_logger(), "MoveP Success.");
    } else {
    if (goal_handle->is_active()) {
    goal_handle->abort(result);
    } // 执行失败
    RCLCPP_INFO(this->get_logger(), "MoveP Fail.");
    }
    }
    catch (const std::exception &e) {
    RCLCPP_ERROR(this->get_logger(), "Exception in MoveP thread: %s", e.what());
    result->success = false;
    if (goal_handle->is_active()) {
    goal_handle->abort(result);
    }
    }
    }

    3.创建动作客户端

    接下来在Python中创建客户端,由于我有多个类似的动作,所以创建了一个基类,动作类型仍需要从定义动作的功能包中引用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    from control_interfaces.action import MoveP
    class BaseMoveClient(Node):
    def __init__(self, node_name, action_name):
    super().__init__(node_name)
    self._action_client = ActionClient(self, self._get_action_type(), action_name)
    self.status = False
    self.success = False

    def _get_action_type(self):
    # 抽象方法由子类实现
    raise NotImplementedError

    def send_goal(self, **kwargs):
    goal_msg = self._create_goal_msg(**kwargs)
    self._action_client.wait_for_server()

    self._send_goal_future = self._action_client.send_goal_async(
    goal_msg,
    feedback_callback=self.feedback_callback
    )
    self._send_goal_future.add_done_callback(self.goal_response_callback)

    def _create_goal_msg(self, **kwargs):
    # 由子类实现具体goal构造逻辑
    raise NotImplementedError

    def feedback_callback(self, feedback_msg):
    # 可提供默认反馈处理,子类可选重写
    self.get_logger().debug(f'Feedback: {feedback_msg}')

    def goal_response_callback(self, future):
    # 统一响应处理(可复用)
    goal_handle = future.result()
    if not goal_handle.accepted:
    self.get_logger().warn('Goal rejected')
    return

    self._get_result_future = goal_handle.get_result_async()
    self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
    # 统一结果处理模板
    result = future.result().result
    self.success = result.success
    self.status = True
    self._handle_specific_result(result) # 子类扩展点

    def _handle_specific_result(self, result):
    # 子类实现具体结果处理
    pass


    class MovePClient(BaseMoveClient):
    def __init__(self):
    super().__init__('move_p_client', 'move_p')

    def _get_action_type(self):
    return MoveP

    def _create_goal_msg(self, target_pose, lor, to_frame, reference_frame, planner):
    goal_msg = MoveP.Goal()
    goal_msg.target_pose = target_pose
    goal_msg.lor = lor
    goal_msg.to_frame = to_frame
    goal_msg.reference_frame = reference_frame
    goal_msg.planner = planner
    return goal_msg

    def _handle_specific_result(self, result):
    if self.success:
    self.get_logger().info(f"MoveP成功")
    创建客户端示例后通过send_goal方法发送目标,并等待结果,但是由于直接通过send_goal方法发送目标,会立刻返回失败,所以这里我添加了一个status和一个success属性,用于判断状态是否成功
    进一步封装
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    def move_p(lor, pose, planner):
    wait = 0
    retry = 0
    while True:
    if lor == "left":
    move_p_client.status = False
    success = move_p_client.send_goal(
    target_pose=pose,
    lor="left",
    to_frame="left_wrist_yaw_link",
    reference_frame="torso_link",
    planner=planner
    )
    else:
    move_p_client.status = False
    success = move_p_client.send_goal(
    target_pose=pose,
    lor="right",
    to_frame="right_wrist_yaw_link",
    reference_frame="torso_link",
    planner=planner
    )
    # 等待移动完成
    while True:
    wait += 1
    # print("wait%f", wait)
    rclpy.spin_once(move_p_client, timeout_sec=0.1)
    if move_p_client.status == True:
    break
    if wait >= 80:
    # 可能丢包,重新发送请求
    break
    if move_p_client.success == True:
    for i in range(10):
    rclpy.spin_once(move_p_client, timeout_sec=0.1)
    retry = 0
    return True
    else:
    return False

如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !