当前位置: 华文星空 > 知识

rosbag record代码分析

2023-01-13知识

ROS通过 rosbag record 命令进行bag包的录制,下面我们分析下ROS是如何进行bag包录制的。

目录

rosbag record 的代码位于 ros_comm\tools\rosbag\src\recorder.cpp 中。实现的类主要为 Recorder

首先我们看下Recorder的构造函数,可以看到通过RecorderOptions选项来构造Recorder。

Recorder :: Recorder ( RecorderOptions const & options ) : options_ ( options ), \\ 读取选项 num_subscribers_ ( 0 ), \\ 订阅数 exit_code_ ( 0 ), queue_size_ ( 0 ), split_count_ ( 0 ), writing_enabled_ ( true ) { }

再看下RecorderOptions选项有哪些?

RecorderOptions :: RecorderOptions () : trigger ( false ), // 是否触发 record_all ( false ), // 录制所有topic regex ( false ), do_exclude ( false ), quiet ( false ), append_date ( true ), snapshot ( false ), verbose ( false ), publish ( false ), compression ( compression :: Uncompressed ), // 压缩方式 prefix ( "" ), // 前缀 name ( "" ), exclude_regex (), buffer_size ( 1048576 * 256 ), // 缓冲大小 chunk_size ( 1024 * 768 ), // chunk大小 limit ( 0 ), split ( false ), // 是否拆分 max_size ( 0 ), max_splits ( 0 ), max_duration ( - 1.0 ), // 最大没有接收到消息的间隔(超过则退出) node ( "" ), // 录制的node min_space ( 1024 * 1024 * 1024 ), // 最小空间1G min_space_str ( "1G" ) { }

接口

了解了Recorder的配置之后,我们接下来分析Recorder的接口,一共有4个。总的来说, 录制bag包的流程分为2个部分,一个流程订阅消息,并且放入队列,另一个流程从队列中读取消息,并且保存到bag文件中

public : Recorder ( RecorderOptions const & options ); // 构造函数 void doTrigger (); // 触发 bool isSubscribed ( std :: string const & topic ) const ; // 是否订阅了topic boost :: shared_ptr < ros :: Subscriber > subscribe ( std :: string const & topic ); // 订阅topic int run (); // 开始运行

下面开始逐个分析这4个接口。

doTrigger

通过触发空消息std_msgs::Empty来保存当前快照

void Recorder :: doTrigger () { ros :: NodeHandle nh ; ros :: Publisher pub = nh . advertise < std_msgs :: Empty > ( "snapshot_trigger" , 1 , true ); pub . publish ( std_msgs :: Empty ()); // 发送空消息,snapshotTrigger接收这个消息并调用 // 定时关闭ros ros :: Timer terminate_timer = nh . createTimer ( ros :: Duration ( 1.0 ), boost :: bind ( & ros :: shutdown )); ros :: spin (); }

发送的消息std_msgs::Empty通过以下函数接收,

void Recorder :: snapshotTrigger ( std_msgs :: Empty :: ConstPtr trigger ) { ( void ) trigger ; updateFilenames (); ROS_INFO ( "Triggered snapshot recording with name '%s'." , target_filename_ . c_str ()); // 更新队列 { boost :: mutex :: scoped_lock lock ( queue_mutex_ ); queue_queue_ . push ( OutgoingQueue ( target_filename_ , queue_ , Time :: now ())); queue_ = new std :: queue < OutgoingMessage > ; queue_size_ = 0 ; } // 触发 queue_condition_ . notify_all (); }

并且通过queue_condition_唤醒保存

void Recorder :: doRecordSnapshotter () { ros :: NodeHandle nh ; while ( nh . ok () || ! queue_queue_ . empty ()) { boost :: unique_lock < boost :: mutex > lock ( queue_mutex_ ); while ( queue_queue_ . empty ()) { if ( ! nh . ok ()) return ; queue_condition_ . wait ( lock ); // 等待唤醒 } OutgoingQueue out_queue = queue_queue_ . front (); queue_queue_ . pop (); lock . release () -> unlock (); string target_filename = out_queue . filename ; string write_filename = target_filename + string ( ".active" ); try { bag_ . open ( write_filename , bagmode :: Write ); // 创建bag文件 } catch ( const rosbag :: BagException & ex ) { ROS_ERROR ( "Error writing: %s" , ex . what ()); return ; } while ( ! out_queue . queue -> empty ()) { OutgoingMessage out = out_queue . queue -> front (); out_queue . queue -> pop (); // 将消息写入bag文件 bag_ . write ( out . topic , out . time , * out . msg ); } // 关闭bag包 stopWriting (); } }

isSubscribed

判断当前topic是否被订阅,代码非常简单,就是从已经订阅的topic集合中查找该topic,看是否能找到。

bool Recorder :: isSubscribed ( string const & topic ) const { // 从set数据结构中查找是否有这个topic return currently_recording_ . find ( topic ) != currently_recording_ . end (); }

subscribe

订阅指定的topic消息,并且返回Subscriber

shared_ptr < ros :: Subscriber > Recorder :: subscribe ( string const & topic ) { ROS_INFO ( "Subscribing to %s" , topic . c_str ()); ros :: NodeHandle nh ; shared_ptr < int > count ( boost :: make_shared < int > ( options_ . limit )); shared_ptr < ros :: Subscriber > sub ( boost :: make_shared < ros :: Subscriber > ()); // 订阅当前topic ros :: SubscribeOptions ops ; ops . topic = topic ; ops . queue_size = 100 ; ops . md5sum = ros :: message_traits :: md5sum < topic_tools :: ShapeShifter > (); ops . datatype = ros :: message_traits :: datatype < topic_tools :: ShapeShifter > (); ops . helper = boost :: make_shared < ros :: SubscriptionCallbackHelperT < const ros :: MessageEvent < topic_tools :: ShapeShifter const > &> > ( boost :: bind ( & Recorder :: doQueue , this , boost :: placeholders :: _1 , topic , sub , count )); ops . transport_hints = options_ . transport_hints ; * sub = nh . subscribe ( ops ); // 把当前topic放入订阅集合,订阅数量加1 currently_recording_ . insert ( topic ); num_subscribers_ ++ ; return sub ; }

doQueue

订阅的消息通过doQueue来放入队列。

void Recorder :: doQueue ( const ros :: MessageEvent < topic_tools :: ShapeShifter const >& msg_event , string const & topic , shared_ptr < ros :: Subscriber > subscriber , shared_ptr < int > count ) { Time rectime = Time :: now (); if ( options_ . verbose ) cout << "Received message on topic " << subscriber -> getTopic () << endl ; OutgoingMessage out ( topic , msg_event . getMessage (), msg_event . getConnectionHeaderPtr (), rectime ); { boost :: mutex :: scoped_lock lock ( queue_mutex_ ); // 将消息放入队列,并且增加大小 queue_ -> push ( out ); queue_size_ += out . msg -> size (); // 消息是否需要重复写入 if ( options_ . repeat_latched ) { ros :: M_string :: const_iterator it = out . connection_header -> find ( "latching" ); if (( it != out . connection_header -> end ()) && ( it -> second == "1" )) { ros :: M_string :: const_iterator it2 = out . connection_header -> find ( "callerid" ); if ( it2 != out . connection_header -> end ()) { latched_msgs_ . insert ({{ subscriber -> getTopic (), it2 -> second }, out }); } } } // 检查buffer是否满了 while ( options_ . buffer_size > 0 && queue_size_ > options_ . buffer_size ) { OutgoingMessage drop = queue_ -> front (); queue_ -> pop (); queue_size_ -= drop . msg -> size (); if ( ! options_ . snapshot ) { Time now = Time :: now (); if ( now > last_buffer_warn_ + ros :: Duration ( 5.0 )) { ROS_WARN ( "rosbag record buffer exceeded. Dropping oldest queued message." ); last_buffer_warn_ = now ; } } } } // 唤醒bag写入线程 if ( ! options_ . snapshot ) queue_condition_ . notify_all (); // 超过option计数的消息则关闭 if (( * count ) > 0 ) { ( * count ) -- ; if (( * count ) == 0 ) { subscriber -> shutdown (); num_subscribers_ -- ; if ( num_subscribers_ == 0 ) ros :: shutdown (); } } }

run

主流程都集中在run中,启动了另一个线程record_thread来进行录制

int Recorder :: run () { ros :: NodeHandle nh ; if ( ! nh . ok ()) return 0 ; // 记录开始录制 if ( options_ . publish ) { pub_begin_write = nh . advertise < std_msgs :: String > ( "begin_write" , 1 , true ); } last_buffer_warn_ = Time (); queue_ = new std :: queue < OutgoingMessage > ; // 订阅options中的topic if ( ! options_ . regex ) { for ( string const & topic : options_ . topics ) subscribe ( topic ); } ros :: Subscriber trigger_sub ; // Spin up a thread for writing to the file boost :: thread record_thread ; // 支持快照 if ( options_ . snapshot ) { record_thread = boost :: thread ( boost :: bind ( & Recorder :: doRecordSnapshotter , this )); // Subscribe to the snapshot trigger trigger_sub = nh . subscribe < std_msgs :: Empty > ( "snapshot_trigger" , 100 , boost :: bind ( & Recorder :: snapshotTrigger , this , boost :: placeholders :: _1 )); } else // 不支持快照,调用doRecord record_thread = boost :: thread ( boost :: bind ( & Recorder :: doRecord , this )); // 检查节点 ros :: Timer check_master_timer ; if ( options_ . record_all || options_ . regex || ( options_ . node != std :: string ( "" ))) { // check for master first doCheckMaster ( ros :: TimerEvent (), nh ); check_master_timer = nh . createTimer ( ros :: Duration ( 1.0 ), boost :: bind ( & Recorder :: doCheckMaster , this , boost :: placeholders :: _1 , boost :: ref ( nh ))); } ros :: AsyncSpinner s ( 10 ); s . start (); record_thread . join (); queue_condition_ . notify_all (); delete queue_ ; return exit_code_ ; }

doRecord

读取消息,并且写入bag文件,其中会检查磁盘和是否超过最大间隔还没接收消息。

void Recorder :: doRecord () { // 打开bag文件,等待写入 startWriting (); // Schedule the disk space check warn_next_ = ros :: WallTime (); // 检查磁盘 try { checkDisk (); } catch ( const rosbag :: BagException & ex ) { ROS_ERROR_STREAM ( ex . what ()); exit_code_ = 1 ; stopWriting (); return ; } check_disk_next_ = ros :: WallTime :: now () + ros :: WallDuration (). fromSec ( 20.0 ); // 循环读取消息并写入,其中做了一些检查 ros :: NodeHandle nh ; while ( nh . ok () || ! queue_ -> empty ()) { boost :: unique_lock < boost :: mutex > lock ( queue_mutex_ ); bool finished = false ; while ( queue_ -> empty ()) { boost :: xtime xt ; boost :: xtime_get ( & xt , boost :: TIME_UTC_ ); xt . nsec += 250000000 ; queue_condition_ . timed_wait ( lock , xt ); // 超过最大间隔没有消息则退出 if ( checkDuration ( ros :: Time :: now ())) { finished = true ; break ; } } if ( finished ) break ; OutgoingMessage out = queue_ -> front (); queue_ -> pop (); queue_size_ -= out . msg -> size (); lock . release () -> unlock (); if ( checkSize ()) break ; if ( checkDuration ( out . time )) break ; try { // 检查磁盘和日志,写入消息到bag if ( scheduledCheckDisk () && checkLogging ()) bag_ . write ( out . topic , out . time , * out . msg , out . connection_header ); } catch ( const rosbag :: BagException & ex ) { ROS_ERROR_STREAM ( ex . what ()); exit_code_ = 1 ; break ; } } stopWriting (); }

总结

ROS中通过 record.cpp 调用 Recorder 类来进行bag的保存。

ROS Bag分析

  • rosbag record代码分析
  • rosbag play代码分析
  • rosbag_storage总体介绍(一)
  • rosbag_storage bag实现(二)
  • rosbag_storage 查询bag包(三)
  • rosbag_storage 文件读写(四)
  • ROS 发布订阅模式

  • ROS发布订阅实现(二)
  • ROS导航模块

  • ROS navigation导航模块(一)