基于Moveit2的人形机器人双臂规划4:封装服务与动作
在完成对上文提到的接口的封装之后,我们进一步把这些功能封装成动作和服务,这样就可以在python中调用这些功能,把moveit和视觉功能结合,虽然moveit本身有python接口,但是在我开始这个项目的时候moveit2的python接口还不完善,直接安装不带python接口,源码编译还有一堆问题,所以采用了自己封装的方法。
此外,从这篇文章开始需要读者具备ROS2基础,了解服务、动作、回调等概念,并且熟悉C++和python编程,能够自行创建功能包,编写ROS2节点,否则是没法直接通过复制本文的代码来实现一个可用的ROS2服务或动作。
- 这里有一个问题,就是动作服务器节点在多次调用后会丢包或者死掉,一直没有解决,换成服务暂时还没有出现这个问题,到底是这一个node里启动了太多action还是什么情况?给action回调分到不同的回调组、添加多线程,都没有解决这个问题
1.定义动作
创建一个功能包,创建动作文件.action,比如说MoveP.action,一个action类型定义由Goal、Result、Feedback三个部分组成,Goal是动作需要的参数,Result是返回的结果,Feedback是执行动作中可以发送的反馈,比如实时反馈动作做到哪一步。在CMakeLists中添加以下内容,注意需要引用你需要用到的消息类型1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# MoveP.action
# Goal
geometry_msgs/PoseStamped target_pose
string lor
string to_frame
string reference_frame
string planner
---
# Result
bool success
---
# Feedback
string feedback_message1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28cmake_minimum_required(VERSION 3.8)
project(control_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(rosidl_generator_dds_idl REQUIRED)
find_package(action_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(trajectory_msgs REQUIRED)
set(
DEPENDENCY_LIST
action_msgs
geometry_msgs
sensor_msgs
trajectory_msgs
)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/MoveP.action"
DEPENDENCIES ${DEPENDENCY_LIST}
)
ament_package()2.创建动作服务端
继承Node类创建一个新的节点类,创建动作服务端,类型为上面定义后编译出来的动作类型,其中actionoptions和callback_group这两个参数是用于多线程,可以不填,这里使用他们的原因后面再说,”move_p”是动作名称,可以自定义,之后调用就用这个名称。分别定义处理goal、cancel、accepted的回调函数,具体执行逻辑在accepted的回调函数中执行1
2
3
4
5
6
7
8
9
10
11
12
13rclcpp_action::Server<control_interfaces::action::MoveP>::SharedPtr move_p_action_server_;
this->callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
auto action_options = rcl_action_server_get_default_options();
move_p_action_server_ = rclcpp_action::create_server<control_interfaces::action::MoveP>(
this,
"move_p",
std::bind(&H1Gripper::handle_move_p_goal, this, std::placeholders::_1, std::placeholders::_2),
std::bind(&H1Gripper::handle_move_p_cancel, this, std::placeholders::_1),
std::bind(&H1Gripper::handle_move_p_accepted, this, std::placeholders::_1),
action_options,
callback_group_
);1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53rclcpp_action::GoalResponse H1Gripper::handle_move_p_goal(
const rclcpp_action::GoalUUID & uuid,
std::shared_ptr<const control_interfaces::action::MoveP::Goal> goal)
{
RCLCPP_INFO(this->get_logger(), "Received MoveP goal request.");
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
}
// 处理 MoveP 的取消请求
rclcpp_action::CancelResponse H1Gripper::handle_move_p_cancel(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<control_interfaces::action::MoveP>> goal_handle)
{
RCLCPP_INFO(this->get_logger(), "Cancel MoveP goal request.");
return rclcpp_action::CancelResponse::ACCEPT;
}
// 处理 MoveP 的接受请求并执行
void H1Gripper::handle_move_p_accepted(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<control_interfaces::action::MoveP>> goal_handle)
{
auto result = std::make_shared<control_interfaces::action::MoveP::Result>();
// 获取目标姿态并调用单手动作函数
const auto & goal = goal_handle->get_goal();
geometry_msgs::msg::PoseStamped target_pose = goal->target_pose;
// 调用单手动作函数 single_move_p
// 调用关节运动控制函数
try{
bool success = single_move_p(goal_handle->get_goal()->lor, target_pose, goal_handle->get_goal()->to_frame, goal_handle->get_goal()->reference_frame, goal_handle->get_goal()->planner);
// 在主线程中处理成功或失败的结果
result->success = success;
if (success) {
goal_handle->succeed(result); // 执行成功
RCLCPP_INFO(this->get_logger(), "MoveP Success.");
} else {
if (goal_handle->is_active()) {
goal_handle->abort(result);
} // 执行失败
RCLCPP_INFO(this->get_logger(), "MoveP Fail.");
}
}
catch (const std::exception &e) {
RCLCPP_ERROR(this->get_logger(), "Exception in MoveP thread: %s", e.what());
result->success = false;
if (goal_handle->is_active()) {
goal_handle->abort(result);
}
}
}3.创建动作客户端
接下来在Python中创建客户端,由于我有多个类似的动作,所以创建了一个基类,动作类型仍需要从定义动作的功能包中引用创建客户端示例后通过send_goal方法发送目标,并等待结果,但是由于直接通过send_goal方法发送目标,会立刻返回失败,所以这里我添加了一个status和一个success属性,用于判断状态是否成功1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71from control_interfaces.action import MoveP
class BaseMoveClient(Node):
def __init__(self, node_name, action_name):
super().__init__(node_name)
self._action_client = ActionClient(self, self._get_action_type(), action_name)
self.status = False
self.success = False
def _get_action_type(self):
# 抽象方法由子类实现
raise NotImplementedError
def send_goal(self, **kwargs):
goal_msg = self._create_goal_msg(**kwargs)
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def _create_goal_msg(self, **kwargs):
# 由子类实现具体goal构造逻辑
raise NotImplementedError
def feedback_callback(self, feedback_msg):
# 可提供默认反馈处理,子类可选重写
self.get_logger().debug(f'Feedback: {feedback_msg}')
def goal_response_callback(self, future):
# 统一响应处理(可复用)
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal rejected')
return
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
# 统一结果处理模板
result = future.result().result
self.success = result.success
self.status = True
self._handle_specific_result(result) # 子类扩展点
def _handle_specific_result(self, result):
# 子类实现具体结果处理
pass
class MovePClient(BaseMoveClient):
def __init__(self):
super().__init__('move_p_client', 'move_p')
def _get_action_type(self):
return MoveP
def _create_goal_msg(self, target_pose, lor, to_frame, reference_frame, planner):
goal_msg = MoveP.Goal()
goal_msg.target_pose = target_pose
goal_msg.lor = lor
goal_msg.to_frame = to_frame
goal_msg.reference_frame = reference_frame
goal_msg.planner = planner
return goal_msg
def _handle_specific_result(self, result):
if self.success:
self.get_logger().info(f"MoveP成功")
进一步封装1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39def move_p(lor, pose, planner):
wait = 0
retry = 0
while True:
if lor == "left":
move_p_client.status = False
success = move_p_client.send_goal(
target_pose=pose,
lor="left",
to_frame="left_wrist_yaw_link",
reference_frame="torso_link",
planner=planner
)
else:
move_p_client.status = False
success = move_p_client.send_goal(
target_pose=pose,
lor="right",
to_frame="right_wrist_yaw_link",
reference_frame="torso_link",
planner=planner
)
# 等待移动完成
while True:
wait += 1
# print("wait%f", wait)
rclpy.spin_once(move_p_client, timeout_sec=0.1)
if move_p_client.status == True:
break
if wait >= 80:
# 可能丢包,重新发送请求
break
if move_p_client.success == True:
for i in range(10):
rclpy.spin_once(move_p_client, timeout_sec=0.1)
retry = 0
return True
else:
return False
如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !