DDS編程示例
我們嘗試在代碼中配置DDS,以之前Hello World話題通信為例。
運行效果
啟動兩個終端,分別運行發布者和訂閱者節點:
$ ros2 run learning_qos qos_helloworld_pub
$ ros2 run learning_qos qos_helloworld_sub
可以看到兩個終端中的通信效果如下,和之前貌似并沒有太大區別。
看效果確實差不多,不過底層通信機理上可是有所不同的。
發布者代碼解析
我們看下在代碼中,如果加入QoS的配置。
learning_qos/qos_helloworld_pub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居(www.guyuehome.com)@說明: ROS2 QoS示例-發布“Hello World”話題"""import rclpy # ROS2 Python接口庫from rclpy.node import Node # ROS2 節點類from std_msgs.msg import String # 字符串消息類型from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類"""創建一個發布者節點"""class PublisherNode(Node): def __init__(self, name): super().__init__(name) # ROS2節點父類初始化 qos_profile = QoSProfile( # 創建一個QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.pub = self.create_publisher(String, "chatter", qos_profile) # 創建發布者對象(消息類型、話題名、QoS原則) self.timer = self.create_timer(0.5, self.timer_callback) # 創建一個定時器(單位為秒的周期,定時執行的回調函數) def timer_callback(self): # 創建定時器周期執行的回調函數 msg = String() # 創建一個String類型的消息對象 msg.data = 'Hello World' # 填充消息對象中的消息數據 self.pub.publish(msg) # 發布話題消息 self.get_logger().info('Publishing: "%s"' % msg.data)# 輸出日志信息,提示已經完成話題發布def main(args=None): # ROS2節點主入口main函數 rclpy.init(args=args) # ROS2 Python接口初始化 node = PublisherNode("qos_helloworld_pub") # 創建ROS2節點對象并進行初始化 rclpy.spin(node) # 循環等待ROS2退出 node.destroy_node() # 銷毀節點對象 rclpy.shutdown() # 關閉ROS2 Python接口
完成代碼的編寫后需要設置功能包的編譯選項,讓系統知道Python程序的入口,打開功能包的setup.py文件,加入如下入口點的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos.qos_helloworld_pub:main',},
訂閱者代碼解析
訂閱者中的QoS配置和發布者類似。
learning_qos/qos_helloworld_sub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居@說明: ROS2 QoS示例-訂閱“Hello World”話題消息"""import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from std_msgs.msg import String # ROS2標準定義的String消息from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類"""創建一個訂閱者節點"""class SubscriberNode(Node): def __init__(self, name): super().__init__(name) # ROS2節點父類初始化 qos_profile = QoSProfile( # 創建一個QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.sub = self.create_subscription( String, "chatter", self.listener_callback, qos_profile) # 創建訂閱者對象(消息類型、話題名、訂閱者回調函數、QoS原則)
def listener_callback(self, msg): # 創建回調函數,執行收到話題消息后對數據的處理 self.get_logger().info('I heard: "%s"' % msg.data) # 輸出日志信息,提示訂閱收到的話題消息def main(args=None): # ROS2節點主入口main函數 rclpy.init(args=args) # ROS2 Python接口初始化 node = SubscriberNode("qos_helloworld_sub") # 創建ROS2節點對象并進行初始化 rclpy.spin(node) # 循環等待ROS2退出
node.destroy_node() # 銷毀節點對象 rclpy.shutdown() # 關閉ROS2 Python接口
完成代碼的編寫后需要設置功能包的編譯選項,讓系統知道Python程序的入口,打開功能包的setup.py文件,加入如下入口點的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。
舉報投訴
-
機器人
+關注
關注
212文章
29422瀏覽量
211344 -
通信
+關注
關注
18文章
6166瀏覽量
137325 -
DDS
+關注
關注
22文章
671瀏覽量
153963 -
代碼
+關注
關注
30文章
4886瀏覽量
70209
發布評論請先 登錄
相關推薦
熱點推薦
如何在MCUxpresso IDE的外設中配置USB?
我必須在 MCUxpresso IDE 中使用 LPC54113 實現 USB 協議代碼。請提供以下詳細信息,1. 如何在 MCUxpresso IDE 的外設中配置 USB。2.US
發表于 04-04 06:22
評論