ROS2時間同步(python)
- 2022 年 10 月 28 日
- 筆記
最近1周一直研究ROS2的時間同步,翻越很多部落格,很少有人使用ROS2進行時間同步的程式碼,無奈不斷嘗試與源碼閱讀,終於將其搞定,
為此,本部落格將介紹基於python的ROS2的時間同步方法。
本部落格內容結構為話題發布程式碼,話題訂閱與時間同步程式碼,程式碼文件夾結構及結果顯示圖片。本部落格假設2個publisher和一個scribe,同步是在scibe中完成。
一.話題發布程式碼
發布1為第二個發布者,可理解為某感測器
publisher1程式碼如下:
#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import String,Float32,Int32 import cv2 # from std_msgs.msg import Header import time class NodePublisher(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("大家好,我是%s!" % name) self.num=0 self.command_publisher1 = self.create_publisher(Int32,"command1", 10) self.timer = self.create_timer(0.4, self.timer_callback) # # self.inputdata1() def inputdata1(self): msg = Int32() #String() period=0.5 print("publisher1-周期",period) self.get_logger().info(f'發布了指令:{msg.data}') #列印一下發布的數據 num=0 while True: num=num+1 msg.data = num #str(num) self.command_publisher_.publish(msg) # time.sleep(period) self.get_logger().info(f'發布了指令:{msg.data}') #列印一下發布的數據 def timer_callback(self): msg = Int32() #String() self.num+=1 msg.data = self.num #str(num) self.command_publisher1.publish(msg) self.get_logger().info(f'發布了指令:{msg.data}') #列印一下發布的數據 def main(args=None): rclpy.init(args=args) # 初始化rclpy node = NodePublisher("topic_publisher1") # 新建一個節點 rclpy.spin(node) # 保持節點運行,檢測是否收到退出指令(Ctrl+C) rclpy.shutdown() # 關閉rclpy
發布2為第二個發布者,可理解為某感測器
publisher2程式碼如下:
#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import String,Float32,Int32 import time class NodePublisher(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("大家好,我是%s!" % name) self.num=0 self.command_publisher2= self.create_publisher(Int32,"command2", 10) self.timer = self.create_timer(0.2, self.timer_callback) # def timer_callback(self): msg = Int32() #String() self.num+=1 msg.data = self.num #str(num) self.command_publisher2.publish(msg) self.get_logger().info(f'發布了指令:{msg.data}') #列印一下發布的數據 def main(args=None): rclpy.init(args=args) # 初始化rclpy node = NodePublisher("topic_publisher2") # 新建一個節點 rclpy.spin(node) # 保持節點運行,檢測是否收到退出指令(Ctrl+C) rclpy.shutdown() # 關閉rclpy
二.話題訂閱及時間同步程式碼
訂閱發布者資訊,並將其同步,可理解為同步不同感測器
scriabe程式碼如下:
#!/usr/bin/env python3 import rclpy from rclpy.node import Node import message_filters from std_msgs.msg import String,Float32,Int32 import message_filters from sensor_msgs.msg import Image, CameraInfo def callback(image_sub,info_sub): res=int(info_sub.data)-int(image_sub.data) print("publisher1:\t{}\tpubsher2:\t{}\t{}".format(str(image_sub.data),str(info_sub.data),res)) def main(args=None): rclpy.init(args=args) # 初始化rclpy scribe_node=Node('scribe_time') image_sub = message_filters.Subscriber(scribe_node, Int32,'command1') info_sub = message_filters.Subscriber(scribe_node, Int32,'command2') ts = message_filters.ApproximateTimeSynchronizer([image_sub, info_sub], 10, 0.1, allow_headerless=True) # allow_headerless=True,可以不使用時間戳 # ts = message_filters.TimeSynchronizer([image_sub, info_sub], 10) # 這個需要時間戳才可調用 ts.registerCallback(callback) rclpy.spin(scribe_node) rospy.spin()
三.參數配置及文件格式
setup.py設置如下:
from setuptools import setup package_name = 'topic_time' setup( name=package_name, version='0.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='root', maintainer_email='[email protected]', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ "publisher1_node = topic_time.publisher1:main", "publisher2_node = topic_time.publisher2:main", "subscribe_node = topic_time.subscribe:main", "subscribe2_node = topic_time.subscribe2:main" ], }, )
文件格式如下:
通過以上程式碼將可看到同步的scribe中發布1時間無間隔,發布2時間間隔為4,恰好與設置周期同等,結果顯示如下: