ROS2時間同步(python)

  • 2022 年 10 月 28 日
  • 筆記

最近1周一直研究ROS2的時間同步,翻越很多部落格,很少有人使用ROS2進行時間同步的程式碼,無奈不斷嘗試與源碼閱讀,終於將其搞定,

為此,本部落格將介紹基於python的ROS2的時間同步方法。

本部落格內容結構為話題發布程式碼,話題訂閱與時間同步程式碼,程式碼文件夾結構及結果顯示圖片。本部落格假設2個publisher和一個scribe,同步是在scibe中完成。

一.話題發布程式碼

發布1為第二個發布者,可理解為某感測器

publisher1程式碼如下:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String,Float32,Int32
import cv2
# from std_msgs.msg import Header
import time
class NodePublisher(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("大家好,我是%s!" % name)
        self.num=0
        self.command_publisher1 = self.create_publisher(Int32,"command1", 10) 
        self.timer = self.create_timer(0.4, self.timer_callback)  # 
        # self.inputdata1()
    def inputdata1(self):
        msg = Int32() #String()
        period=0.5
        print("publisher1-周期",period)
        self.get_logger().info(f'發布了指令:{msg.data}')    #列印一下發布的數據
        num=0
        while True:
            num=num+1
            msg.data = num #str(num)

            self.command_publisher_.publish(msg) 
            # time.sleep(period)
            self.get_logger().info(f'發布了指令:{msg.data}')    #列印一下發布的數據
    def timer_callback(self):
        msg = Int32() #String()
        self.num+=1
        msg.data = self.num #str(num)
        self.command_publisher1.publish(msg) 
        self.get_logger().info(f'發布了指令:{msg.data}')    #列印一下發布的數據
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = NodePublisher("topic_publisher1")  # 新建一個節點
    rclpy.spin(node) # 保持節點運行,檢測是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 關閉rclpy

發布2為第二個發布者,可理解為某感測器

publisher2程式碼如下:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String,Float32,Int32
import time
class NodePublisher(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("大家好,我是%s!" % name)
        self.num=0
        self.command_publisher2= self.create_publisher(Int32,"command2", 10) 
        self.timer = self.create_timer(0.2, self.timer_callback)  #    
    def timer_callback(self):
        msg = Int32() #String()
        self.num+=1
        msg.data = self.num #str(num)
        self.command_publisher2.publish(msg) 
        self.get_logger().info(f'發布了指令:{msg.data}')    #列印一下發布的數據
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = NodePublisher("topic_publisher2")  # 新建一個節點
    rclpy.spin(node) # 保持節點運行,檢測是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 關閉rclpy

二.話題訂閱及時間同步程式碼

訂閱發布者資訊,並將其同步,可理解為同步不同感測器

scriabe程式碼如下:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import message_filters
from std_msgs.msg import String,Float32,Int32
import message_filters
from sensor_msgs.msg import Image, CameraInfo
def callback(image_sub,info_sub):
    res=int(info_sub.data)-int(image_sub.data)
    print("publisher1:\t{}\tpubsher2:\t{}\t{}".format(str(image_sub.data),str(info_sub.data),res))
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    scribe_node=Node('scribe_time')
 
    image_sub = message_filters.Subscriber(scribe_node, Int32,'command1')
    info_sub = message_filters.Subscriber(scribe_node, Int32,'command2')
    
    ts = message_filters.ApproximateTimeSynchronizer([image_sub, info_sub], 10, 0.1, allow_headerless=True) # allow_headerless=True,可以不使用時間戳
    # ts = message_filters.TimeSynchronizer([image_sub, info_sub], 10) # 這個需要時間戳才可調用 
    ts.registerCallback(callback)
    rclpy.spin(scribe_node)
    rospy.spin()   
    

 

三.參數配置及文件格式

setup.py設置如下:

from setuptools import setup
package_name = 'topic_time'
setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='root',
    maintainer_email='[email protected]',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        "publisher1_node = topic_time.publisher1:main",
        "publisher2_node = topic_time.publisher2:main",
        "subscribe_node = topic_time.subscribe:main",
        "subscribe2_node = topic_time.subscribe2:main"

        ],
    },
)

 

文件格式如下:

 

 

 

通過以上程式碼將可看到同步的scribe中發布1時間無間隔,發布2時間間隔為4,恰好與設置周期同等,結果顯示如下: