ROS應用層通信協議解析
參考://wiki.ros.org/ROS/Master_API
//wiki.ros.org/ROS/Connection Header
說明
ROS本質上就是一個松耦合的通信框架,通信模式包括:遠程調用(service-client)、訂閱發佈(topic)、持續通信(action)和全局參數(參數服務器),這四種模式基本已經能夠涵蓋百分之九十的應用場景了
本次針對訂閱發佈模式,探究一下ROS通信中的具體通信協議,讀完本文後,你可以在不依賴ROS的情況下和ROS通信
本次通信採用從機訂閱主機數據,通過wireshark抓包,得到具體xmlrpc協議數據內容,根據xmlrpc協議格式,找到對應代碼
(因為時間有限,部分協議可能有跳過的地方)
1、registerPublisher
從機創建節點的第一步
這個方法用於註冊一個發佈者的caller
request報文body:
<?xml version="1.0"?>
<methodCall>
<methodName>registerPublisher</methodName>
<params>
<param>
<value>/test_sub</value>
</param>
<param>
<value>/rosout</value>
</param>
<param>
<value>rosgraph_msgs/Log</value>
</param>
<param>
<value>//192.168.1.150:40209</value>
</param>
</params>
</methodCall>
response報文body:
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<int>1</int>
</value>
<value>
<string>Registered [/test_sub] as publisher of [/rosout]</string>
</value>
<value>
<array>
<data>
<value>
<string>//sherlock:35861/</string>
</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
先說結論:
ROS有一個日誌相關的topic,名稱是 /rosout,所有節點的日誌信息都會通過這個 topic 發佈出來
ROS還有一個日誌相關的節點,名稱是 /rosout,負責訂閱 /rosout數據,然後使用名稱為 /rosout_agg 的topic發佈出來, /rosout_agg 的訂閱者是rqt等調試工具
所以,結合上面的xml內容,我們可以大致推斷,創建一個新的節點,那麼這個節點必定會發佈一個topic,就是/rosout,所以上面的XMLRPC協議內容,就是網rosmaster內註冊一個publisher,用於發佈/rosout
整體來說,就是調用接口
registerPublisher("/test_sub", "/rosout", "rosgraph_msgs/Log", "//192.168.1.150:40209")
返回值是:
1
Registered [/test_sub] as publisher of [/rosout]
//sherlock:35861/
1是固定數據
第二行是message
最後一個返回值表示訂閱者的uri列表,這裡因為只有一個訂閱者,所有隻有一個uri
再看代碼:
函數聲明如下:
registerPublisher(caller_id, topic, topic_type, caller_api)
Register the caller as a publisher the topic.
參數
caller_id (str)
ROS caller ID
topic (str)
Fully-qualified name of topic to register.
topic_type (str)
Datatype for topic. Must be a package-resource name, i.e. the .msg name.
caller_api (str)
API URI of publisher to register.
返回值(int, str, [str])
(code, statusMessage, subscriberApis)
List of current subscribers of topic in the form of XMLRPC URIs.
找到 registerPublisher 接接口,位於ros_comm/rosmaster 包中,文件為:master_api.py(ROS主從機制利用python實現,拿掉python則無法實現主從)
@apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerPublisher(self, caller_id, topic, topic_type, caller_api):
"""
Register the caller as a publisher the topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic: Fully-qualified name of topic to register.
@type topic: str
@param topic_type: Datatype for topic. Must be a
package-resource name, i.e. the .msg name.
@type topic_type: str
@param caller_api str: ROS caller XML-RPC API URI
@type caller_api: str
@return: (code, statusMessage, subscriberApis).
List of current subscribers of topic in the form of XMLRPC URIs.
@rtype: (int, str, [str])
"""
#NOTE: we need topic_type for getPublishedTopics.
try:
self.ps_lock.acquire()
self.reg_manager.register_publisher(topic, caller_id, caller_api)
# don't let '*' type squash valid typing
if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types:
self.topics_types[topic] = topic_type
pub_uris = self.publishers.get_apis(topic)
sub_uris = self.subscribers.get_apis(topic)
self._notify_topic_subscribers(topic, pub_uris, sub_uris)
mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api)
sub_uris = self.subscribers.get_apis(topic)
finally:
self.ps_lock.release()
return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
registerPublisher 接口的注釋:Register the caller as a publisher the topic,將調用者註冊為一個topic發佈者
可以對應xmlrpc中對應參數,加上猜測:
caller_id:調用者,可以認為是節點,/test_sub,從及創建的節點
topic:發佈的topic name,/rosout
topic_type:發佈的topic數據類型,rosgraph_msgs/Log
caller_api:調用者發佈數據的API接口,//192.168.1.150:40209
總上,我們大概有幾點猜測:
- 接口在rosmaster中,接口是registerPublisher,表示,這是註冊節點的
- 告訴master節點,我創建了一個節點,節點名是/test_sub
- 告訴master,這個節點需要發佈topic,topic名是/rosout,數據類型是rosgraph_msgs/Log
registerPublisher 接口中有三個地方需要注意:
- register_publisher接口調用
- _notify_topic_subscribers接口調用,告知當前所有的subscriber,有新的publisher,他們需要再次到新的publisher中去訂閱數據
- return 內容,最後會拼接成xmlrpc的報文,response 回去,這也就順便解釋了第二條xmlrpc報文(response)
register_publisher
先看 register_publisher,代碼在rosmaster中的registrations.py文件中
def register_publisher(self, topic, caller_id, caller_api):
"""
Register topic publisher
@return: None
"""
self._register(self.publishers, topic, caller_id, caller_api)
_register 接口,這個節點做了三件事
- 調用內部接口保存節點信息
- 如果這個節點之前已經存在,就表明它是在更新,則發佈數據的接口改變,且之前已經有訂閱,則此時所有訂閱該接口的所有subscriber解除訂閱
- 調用register接口保存
def _register(self, r, key, caller_id, caller_api, service_api=None):
# update node information
node_ref, changed = self._register_node_api(caller_id, caller_api)
node_ref.add(r.type, key)
# update pub/sub/service indicies
if changed:
self.publishers.unregister_all(caller_id)
self.subscribers.unregister_all(caller_id)
self.services.unregister_all(caller_id)
self.param_subscribers.unregister_all(caller_id)
r.register(key, caller_id, caller_api, service_api)
_register_node_api 接口,我們可以看到,它主要做兩件事
- 更新master中節點信息(節點名、節點發佈數據的接口)
- 檢查這個節點是不是已經存在,如果是,則告訴調用者
def _register_node_api(self, caller_id, caller_api):
"""
@param caller_id: caller_id of provider
@type caller_id: str
@param caller_api: caller_api of provider
@type caller_api: str
@return: (registration_information, changed_registration). changed_registration is true if
caller_api is differet than the one registered with caller_id
@rtype: (NodeRef, bool)
"""
node_ref = self.nodes.get(caller_id, None)
bumped_api = None
if node_ref is not None:
if node_ref.api == caller_api:
return node_ref, False
else:
bumped_api = node_ref.api
self.thread_pool.queue_task(bumped_api, shutdown_node_task,
(bumped_api, caller_id, "new node registered with same name"))
node_ref = NodeRef(caller_id, caller_api)
self.nodes[caller_id] = node_ref
return (node_ref, bumped_api != None)
_notify_topic_subscribers
_notify_topic_subscribers 代碼,根據注釋說明,接口的作用就是通知所有的subscriber,有新的publisher
def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
"""
Notify subscribers with new publisher list
@param topic: name of topic
@type topic: str
@param pub_uris: list of URIs of publishers.
@type pub_uris: [str]
"""
self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)
_notify 代碼,將更新的通知任務(publisher_update_task)放進事件隊列中,等待執行:
def _notify(self, registrations, task, key, value, node_apis):
"""
Generic implementation of callback notification
@param registrations: Registrations
@type registrations: L{Registrations}
@param task: task to queue
@type task: fn
@param key: registration key
@type key: str
@param value: value to pass to task
@type value: Any
"""
# cache thread_pool for thread safety
thread_pool = self.thread_pool
if not thread_pool:
return
try:
for node_api in node_apis:
# use the api as a marker so that we limit one thread per subscriber
thread_pool.queue_task(node_api, task, (node_api, key, value))
except KeyError:
_logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
publisher_update_task 代碼,傳入的三個參數分別是:新節點的接口、topic名稱、訂閱者的接口:
def publisher_update_task(api, topic, pub_uris):
"""
Contact api.publisherUpdate with specified parameters
@param api: XML-RPC URI of node to contact
@type api: str
@param topic: Topic name to send to node
@type topic: str
@param pub_uris: list of publisher APIs to send to node
@type pub_uris: [str]
"""
msg = "publisherUpdate[%s] -> %s %s" % (topic, api, pub_uris)
mloginfo(msg)
start_sec = time.time()
try:
#TODO: check return value for errors so we can unsubscribe if stale
ret = xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
msg_suffix = "result=%s" % ret
except Exception as ex:
msg_suffix = "exception=%s" % ex
raise
finally:
delta_sec = time.time() - start_sec
mloginfo("%s: sec=%0.2f, %s", msg, delta_sec, msg_suffix)
publisherUpdate 接口在 rospy 模塊的 masterslave.py 文件中,猜測是使用XMLRPC協議,通知所有的訂閱者節點,發佈者更新了
@apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
def publisherUpdate(self, caller_id, topic, publishers):
"""
Callback from master of current publisher list for specified topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic str: topic name
@type topic: str
@param publishers: list of current publishers for topic in the form of XMLRPC URIs
@type publishers: [str]
@return: [code, status, ignore]
@rtype: [int, str, int]
"""
if self.reg_man:
for uri in publishers:
self.reg_man.publisher_update(topic, publishers)
return 1, "", 0
2、hasParam
request報文body
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<int>1</int>
</value>
<value>
<string>/use_sim_time</string>
</value>
<value>
<boolean>0</boolean>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
response報文body
<?xml version="1.0"?>
<methodCall>
<methodName>requestTopic</methodName>
<params>
<param>
<value>/rosout</value>
</param>
<param>
<value>/rosout</value>
</param>
<param>
<value>
<array>
<data>
<value>
<array>
<data>
<value>TCPROS</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodCall>
先說結論:
調用 hasParam 接口,檢查參數服務器是否有參數 /test_sub/use_sim_time
返回值是 [1, /use_sim_time, 0],表示沒有
再看代碼:
hasParam
有了上面的分析經驗,我們可以很輕鬆地的出結論,這是在調用 hasParam 接口
我們可以很輕鬆地找到這個方法的代碼,在rosmaster模塊的master_api.py文件中
@apivalidate(False, (non_empty_str('key'),))
def hasParam(self, caller_id, key):
"""
Check if parameter is stored on server.
@param caller_id str: ROS caller id
@type caller_id: str
@param key: parameter to check
@type key: str
@return: [code, statusMessage, hasParam]
@rtype: [int, str, bool]
"""
key = resolve_name(key, caller_id)
if self.param_server.has_param(key):
return 1, key, True
else:
return 1, key, False
根據協議
- caller_id 傳參是 /test_sub
- key 傳參是 /use_sim_time
根據注釋和代碼,我們可以確認,這個就口就是在檢查,參數服務器是否有參數 /test_sub/use_sim_time
resolve_name 接口接收兩個參數,根據調用:
- name是/use_sim_time
- namespace_是/test_sub
所以,才能確認上面的全局參數 /test_sub/use_sim_time
hasParam 接口的返回值有三個
- code,整型,這裡無論有沒有,都返回1,可以忽略
- key,這裡就是 /use_sim_time
- hasParam,表示是否有這個參數,True/False
根據 response 報文,這裡應該返回 [1, /use_sim_time, 0],表示沒有這個參數
use_sim_time
想要理解為什麼要調用這個接口,就要理解 use_sim_time 參數的作用
use_sim_time是一個重要的參數,它默認值為false,可以配合Rosbag使用,是一個很重要的離線調試工具
我們都知道,ROS 中的時間有兩種:
- ROS::Time()
- ROS::WallTime()
ROS::Time()和ROS::WallTime()
表示ROS網絡中的時間,如果當時在非仿真環境里運行,那它就是當前的時間。但是假設去回放當時的情況,那就需要把當時的時間錄下來
以控制為例,很多的數據處理需要知道當時某一個時刻發生了什麼。Wall Time可以理解為牆上時間,牆上掛着的時間沒有人改變的了,永遠在往前走;ROS Time可以被人為修改,你可以暫停它,可以加速,可以減速,但是Wall Time不可以。
在開啟一個Node之前,當把use_sim_time設置為true時,這個節點會從clock Topic獲得時間。所以操作這個clock的發佈者,可以實現一個讓Node中得到ROS Time暫停、加速、減速的效果。同時下面這些方面都是跟Node透明的,所以非常適合離線的調試方式。當把ROSbag記下來以後重新play出來時,加兩個橫杠,–clock,它就會發佈出這個消息
3、registerSubscriber
先看報文:
request報文body
<?xml version="1.0"?>
<methodCall>
<methodName>registerSubscriber</methodName>
<params>
<param>
<value>/test_sub</value>
</param>
<param>
<value>/ros_message</value>
</param>
<param>
<value>my_package/MessageDefine</value>
</param>
<param>
<value>//192.168.1.150:43597</value>
</param>
</params>
</methodCall>
response報文body
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<int>1</int>
</value>
<value>
<string>Subscribed to [/ros_message]</string>
</value>
<value>
<array>
<data>
<value>
<string>//sherlock:41689/</string>
</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
registerSubscriber
registerSubscriber 代碼在rosmaster包中的master_api.py文件中,如下:
@apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
"""
Subscribe the caller to the specified topic. In addition to receiving
a list of current publishers, the subscriber will also receive notifications
of new publishers via the publisherUpdate API.
@param caller_id: ROS caller id
@type caller_id: str
@param topic str: Fully-qualified name of topic to subscribe to.
@param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name.
@type topic_type: str
@param caller_api: XML-RPC URI of caller node for new publisher notifications
@type caller_api: str
@return: (code, message, publishers). Publishers is a list of XMLRPC API URIs
for nodes currently publishing the specified topic.
@rtype: (int, str, [str])
"""
#NOTE: subscribers do not get to set topic type
try:
self.ps_lock.acquire()
self.reg_manager.register_subscriber(topic, caller_id, caller_api)
# ROS 1.1: subscriber can now set type if it is not already set
# - don't let '*' type squash valid typing
if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE:
self.topics_types[topic] = topic_type
mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api)
pub_uris = self.publishers.get_apis(topic)
finally:
self.ps_lock.release()
return 1, "Subscribed to [%s]"%topic, pub_uris
根據協議往來,我們可以看到調用過程
registerSubscriber("/test_sub", "/ros_message", "my_package/MessageDefine", "//192.168.1.150:43597")
入參有4個:
- 訂閱節點名:/test_sub
- 需要訂閱的topic 名稱:/ros_message
- topic的數據類型:my_package/MessageDefine
- 訂閱節點自己的uri,即發佈者通知時的發送目標
返回值有3個:
-
code,這裡固定是1
-
message,這裡是 Subscribed to [/ros_message]
-
publisher 的訂閱URI 列表,因為這裡只有一個publisher,所以只有一個 //sherlock:41689/,首先,這可能是主機裏面某個幾點的uri,需要從機去訂閱
代碼說明:
和第一條,註冊publisher相反,這裡是註冊subscriber
有幾個關鍵代碼
register_subscriber 代碼,位於rosmaster包中的registerations.py文件中:
def register_subscriber(self, topic, caller_id, caller_api):
"""
Register topic subscriber
@return: None
"""
self._register(self.subscribers, topic, caller_id, caller_api)
_register 代碼,調用 _register_node_api 接口更新節點信息,如果之前有該節點的註冊信息,則先刪除:
def _register(self, r, key, caller_id, caller_api, service_api=None):
# update node information
node_ref, changed = self._register_node_api(caller_id, caller_api)
node_ref.add(r.type, key)
# update pub/sub/service indicies
if changed:
self.publishers.unregister_all(caller_id)
self.subscribers.unregister_all(caller_id)
self.services.unregister_all(caller_id)
self.param_subscribers.unregister_all(caller_id)
r.register(key, caller_id, caller_api, service_api)
_register_node_api 代碼:
def _register_node_api(self, caller_id, caller_api):
"""
@param caller_id: caller_id of provider
@type caller_id: str
@param caller_api: caller_api of provider
@type caller_api: str
@return: (registration_information, changed_registration). changed_registration is true if
caller_api is differet than the one registered with caller_id
@rtype: (NodeRef, bool)
"""
node_ref = self.nodes.get(caller_id, None)
bumped_api = None
if node_ref is not None:
if node_ref.api == caller_api:
return node_ref, False
else:
bumped_api = node_ref.api
self.thread_pool.queue_task(bumped_api, shutdown_node_task,
(bumped_api, caller_id, "new node registered with same name"))
node_ref = NodeRef(caller_id, caller_api)
self.nodes[caller_id] = node_ref
return (node_ref, bumped_api != None)
shutdown_node_task 代碼,如果訂閱節點退出了,則需要通知:
def shutdown_node_task(api, caller_id, reason):
"""
Method to shutdown another ROS node. Generally invoked within a
separate thread as this is used to cleanup hung nodes.
@param api: XML-RPC API of node to shutdown
@type api: str
@param caller_id: name of node being shutdown
@type caller_id: str
@param reason: human-readable reason why node is being shutdown
@type reason: str
"""
try:
xmlrpcapi(api).shutdown('/master', "[{}] Reason: {}".format(caller_id, reason))
except:
pass #expected in many common cases
remove_server_proxy(api)
4、requestTopic
request報文body,如下,我們可以看到,xmlrpc發送的host是sherlock:41689,是上一步驟,收到的publisher的uri,如果有多個publisher,要request多次
POST /RPC2 HTTP/1.1
Host: sherlock:41689
User-Agent: Go-http-client/1.1
Content-Length: 307
Content-Type: text/xml
Accept-Encoding: gzip
<?xml version="1.0"?>
<methodCall>
<methodName>requestTopic</methodName>
<params>
<param>
<value>/test_sub</value>
</param>
<param>
<value>/ros_message</value>
</param>
<param>
<value>
<array>
<data>
<value>
<array>
<data>
<value>TCPROS</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodCall>
response報文
<?xml version="1.0"?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<i4>1</i4>
</value>
<value></value>
<value>
<array>
<data>
<value>TCPROS</value>
<value>sherlock</value>
<value>
<i4>33173</i4>
</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
根據協議,調用過程如下:
requestTopic("/test_sub", "/ros_message", ["TCPROS"])
告訴publisher,subscriber準備好了,可以發數據了
requestTopic
_remap_table['requestTopic'] = [0] # remap topic
@apivalidate([], (is_topic('topic'), non_empty('protocols')))
def requestTopic(self, caller_id, topic, protocols):
"""
Publisher node API method called by a subscriber node.
Request that source allocate a channel for communication. Subscriber provides
a list of desired protocols for communication. Publisher returns the
selected protocol along with any additional params required for
establishing connection. For example, for a TCP/IP-based connection,
the source node may return a port number of TCP/IP server.
@param caller_id str: ROS caller id
@type caller_id: str
@param topic: topic name
@type topic: str
@param protocols: list of desired
protocols for communication in order of preference. Each
protocol is a list of the form [ProtocolName,
ProtocolParam1, ProtocolParam2...N]
@type protocols: [[str, XmlRpcLegalValue*]]
@return: [code, msg, protocolParams]. protocolParams may be an
empty list if there are no compatible protocols.
@rtype: [int, str, [str, XmlRpcLegalValue*]]
"""
if not get_topic_manager().has_publication(topic):
return -1, "Not a publisher of [%s]"%topic, []
for protocol in protocols: #simple for now: select first implementation
protocol_id = protocol[0]
for h in self.protocol_handlers:
if h.supports(protocol_id):
_logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id)
return h.init_publisher(topic, protocol)
return 0, "no supported protocol implementations", []
init_publisher 代碼
def init_publisher(self, resolved_name, protocol):
"""
Initialize this node to receive an inbound TCP connection,
i.e. startup a TCP server if one is not already running.
@param resolved_name: topic name
@type resolved__name: str
@param protocol: negotiated protocol
parameters. protocol[0] must be the string 'TCPROS'
@type protocol: [str, value*]
@return: (code, msg, [TCPROS, addr, port])
@rtype: (int, str, list)
"""
if protocol[0] != TCPROS:
return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
start_tcpros_server()
addr, port = get_tcpros_server_address()
return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
publisher 檢查,是否支持指定協議,如果不支持,則返回1,否則返回0
返回值的第二個參數有三個值,分別是協議類型、ip地址 和 端口
5、unregisterSubscriber
request報文body
<?xml version="1.0"?>
<methodCall>
<methodName>unregisterSubscriber</methodName>
<params>
<param>
<value>/test_sub</value>
</param>
<param>
<value>/ros_message</value>
</param>
<param>
<value>//192.168.1.150:43597</value>
</param>
</params>
</methodCall>
response報文body
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<int>1</int>
</value>
<value>
<string>Unregistered [/test_sub] as provider of [/ros_message]</string>
</value>
<value>
<int>1</int>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
unregisterSubscriber
@apivalidate(0, (is_topic('topic'), is_api('caller_api')))
def unregisterSubscriber(self, caller_id, topic, caller_api):
"""
Unregister the caller as a subscriber of the topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic: Fully-qualified name of topic to unregister.
@type topic: str
@param caller_api: API URI of service to unregister. Unregistration will only occur if current
registration matches.
@type caller_api: str
@return: (code, statusMessage, numUnsubscribed).
If numUnsubscribed is zero it means that the caller was not registered as a subscriber.
The call still succeeds as the intended final state is reached.
@rtype: (int, str, int)
"""
try:
self.ps_lock.acquire()
retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api)
mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api)
return retval
finally:
self.ps_lock.release()
取消訂閱,固定返回1
6、TCP數據私有協議
首先保證主從機的數據類型一致,包括字段的順序,實際ROS框架內是通過md5檢測,保證數據類型一致的
數據傳輸前提:
- 數據類型一致
- 字段名一致
- 字段順序一致
數據傳輸模式:小端hex
數據包結構
數據域長度 | 數據域 |
---|---|
4 byte | n byte |
數據域長度固定4byte,長度不包括自身
數據域
數據域根據字段類型解析,ros 通信的內置數據類型有:
原始類型 | 位元組數 |
---|---|
bool | 1 |
int8 | 1 |
uint8 | 1 |
int16 | 2 |
uint16 | 2 |
int32 | 4 |
uint32 | 4 |
int64 | 8 |
uint64 | 8 |
float32 | 4 |
float64 | 8 |
string | n(n > 4) |
time | 8 |
duration | 8 |
數組 | n(n > 4) |
其中,除 string、time、duration 和 數組 類型外的其餘類型,直接根據位元組數讀取即可
string
字符串類型,也可認為是字符數組(則可以和數組類型復用),因為是不定長度,所以需要知道字符串的長度,ROS中使用uint32類型表示長度/數組元素數量,即4byte
所以,如果出現字符串類型,則數據域為:
字符串長度 | 字符 |
---|---|
4 byte | n byte |
數組
數組類型,因為是不定長度,所以需要知道數組的元素數量,和string同理,ROS 中使用uint32類型表示數組的元素數量,再結合數組元素的類型,即可得到總長度
所以,出現數組類型,則數據域為:
數組元素數量 | 數組數據 |
---|---|
4 byte | n byte |
如果數組類型是 int32,則數組數據占 4 * n byte,其餘類型以此類推
time
ROS 中把 time 單獨提取作為基本數據類型,對應 ROS 中的 ros::Time 類,因為我們可以認為是嵌套類型
ros::Time 有兩個字段:
- sec: uint32
- nsec: uint32
所以,time 類型在數據域佔8byte,如果出現 time 類型,則數據域為:
sec | nsec |
---|---|
4 byte | 4 byte |
duration
duration 類型和 time 相同,在 ROS 中對應 ros::Duration 類,可以認為是嵌套類型
ros::Duration 有兩個字段:
- sec: uint32
- nsec: uint32
所以,duration 類型在數據域中佔8byte,如果出現 duration 類型,則數據域為:
sec | nsec |
---|---|
4 byte | 4 byte |
嵌套類型
嵌套類型可以認為是數據域的組合,如果發現字段類型不是內置數據類型,則可認為是嵌套類型,嵌套類型按照類型的字段,遞歸處理即可
協議分析示例
示例1:
.msg 文件為:
int8 shutdown_time
string text
主機發出數據為:
shutdown_time = 123
text = abc
從機收到數據為:
08 00 00 00 7b 03 00 00 00 61 62 63
分析如下:
-
包頭4 byte表示數據與長度
08 00 00 00,表示數據域長度為8,即後續數據總長度為8
-
字段1為shutdown_time,類型是int8,1byte
7b轉10進制,為123
-
字段2為text,類型是字符串 (4+n)byte
4byte 長度:03 00 00 00,表示字符串長度為3,後面3byte 為字符串內容:61 62 63,ASCII轉換為:abc)
示例2:
.msg 文件為:
Header header
int8 shutdown_time
int32 shutdown_time2
string text
float32 num
string text2
int8[] data
int16[] data2
Header的數據類型為:
uint32 seq
time stamp
string frame_id
主機發出數據為:
//header一般由ROS系統自己處理,這裡寫出來是為了方便觀察
header.seq = 29;
header.time.sec = 0;
header.time.nsec = 0;
header.frame_id = "";
shutdown_time = 123;
shutdown_time2 = 987654;
text2 = "lmn";
text = "abc";
num = 23.4;
data = [1, 2, 4, 89];
data2 = [11, 22, 908]
從機收到的數據為:
39 00 00 00 1d 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 7b 06 12 0f 00 03 00 00 00 61 62 63 33 33 bb 41 03 00 00 00 6c 6d 6e 04 00 00 00 01 02 04 59 03 00 00 00 0b 00 16 00 8c 03
分析如下:
-
包頭4byte表示數據域長度
0x39 00 00 00,10進制57,表明後續數據域長度57byte
-
字段1,Header 類型,可以認為是嵌套類型,Header字段如下:
- 字段1,seq,uint32,4byte,數據為,0x1d 00 00 00,十進制29;
- 字段2,time類型,可以認為是嵌套類型,字段如下:
- 字段1,sec,uint32,4byte,數據為:0x00 00 00 00,十進制0;
- 字段2,nsec,uint32,4byte,數據為:0x00 00 00 00,十進制0;
- 字段3,frame_id,字符串類型,4byte 表示長度,00 00 00 00,表示長度為0,字符串為空
-
字段2,shutdown_time,int8,1byte,數據為:0x7b,十進制123;
-
字段3,shutdown_time2,int32,4byte,數據為:0x06 12 0f 00,十進制:987654;
-
字段4,text,字符串:
- 4byte 長度,數據為:0x03 00 00 00 ,表示字符產長度為3;
- 字符串內容,數據為:0x61 62 63 ,ASCII對應:abc;
-
字段5,num,flota32,4byte,數據為:33 33 bb 41,十進制:23.4;
-
字段6:text2,字符串:
- 4byte長度,數據為:0x03 00 00 00,表示字符串長度為3;
- 字符產內容,數據為:0x6c 6d 6e,ASCII對應lmn;
-
字段7,data,int8數組:
- 4byte表示數組元素數量,數據為:0x04 00 00 00,表示有4個int8元素:
- 數組內容:[0x01, 0x02, 0x04, 0x59,],表示:[1,2,4,89];
-
字段8,data2,int16數組:
- 4byte表示長度,數據為:0x03 00 00 00,表示有3個int16數據;
- 數組內容:[0x0b00, 0x1600, 0x8c03],表示:[11, 22, 908]
7、小結
宗上,整體的從機訂閱時序圖如下: