當前位置: 華文星空 > 知識

rosbag record代分碼析

2023-01-13知識

ROS透過 rosbag record 命令進行bag包的錄制,下面我們分析下ROS是如何進行bag包錄制的。

目錄

rosbag record 的程式碼位於 ros_comm\tools\rosbag\src\recorder.cpp 中。實作的類主要為 Recorder

首先我們看下Recorder的建構函式,可以看到透過RecorderOptions選項來構造Recorder。

Recorder :: Recorder ( RecorderOptions const & options ) : options_ ( options ), \\ 讀取選項 num_subscribers_ ( 0 ), \\ 訂閱數 exit_code_ ( 0 ), queue_size_ ( 0 ), split_count_ ( 0 ), writing_enabled_ ( true ) { }

再看下RecorderOptions選項有哪些?

RecorderOptions :: RecorderOptions () : trigger ( false ), // 是否觸發 record_all ( false ), // 錄制所有topic regex ( false ), do_exclude ( false ), quiet ( false ), append_date ( true ), snapshot ( false ), verbose ( false ), publish ( false ), compression ( compression :: Uncompressed ), // 壓縮方式 prefix ( "" ), // 字首 name ( "" ), exclude_regex (), buffer_size ( 1048576 * 256 ), // 緩沖大小 chunk_size ( 1024 * 768 ), // chunk大小 limit ( 0 ), split ( false ), // 是否拆分 max_size ( 0 ), max_splits ( 0 ), max_duration ( - 1.0 ), // 最大沒有接收到訊息的間隔(超過則結束) node ( "" ), // 錄制的node min_space ( 1024 * 1024 * 1024 ), // 最小空間1G min_space_str ( "1G" ) { }

介面

了解了Recorder的配置之後,我們接下來分析Recorder的介面,一共有4個。總的來說, 錄制bag包的流程分為2個部份,一個流程訂閱訊息,並且放入佇列,另一個流程從佇列中讀取訊息,並且保存到bag檔中

public : Recorder ( RecorderOptions const & options ); // 建構函式 void doTrigger (); // 觸發 bool isSubscribed ( std :: string const & topic ) const ; // 是否訂閱了topic boost :: shared_ptr < ros :: Subscriber > subscribe ( std :: string const & topic ); // 訂閱topic int run (); // 開始執行

下面開始逐個分析這4個介面。

doTrigger

透過觸發空訊息std_msgs::Empty來保存當前快照

void Recorder :: doTrigger () { ros :: NodeHandle nh ; ros :: Publisher pub = nh . advertise < std_msgs :: Empty > ( "snapshot_trigger" , 1 , true ); pub . publish ( std_msgs :: Empty ()); // 發送空訊息,snapshotTrigger接收這個訊息並呼叫 // 定時關閉ros ros :: Timer terminate_timer = nh . createTimer ( ros :: Duration ( 1.0 ), boost :: bind ( & ros :: shutdown )); ros :: spin (); }

發送的訊息std_msgs::Empty透過以下函式接收,

void Recorder :: snapshotTrigger ( std_msgs :: Empty :: ConstPtr trigger ) { ( void ) trigger ; updateFilenames (); ROS_INFO ( "Triggered snapshot recording with name '%s'." , target_filename_ . c_str ()); // 更新佇列 { boost :: mutex :: scoped_lock lock ( queue_mutex_ ); queue_queue_ . push ( OutgoingQueue ( target_filename_ , queue_ , Time :: now ())); queue_ = new std :: queue < OutgoingMessage > ; queue_size_ = 0 ; } // 觸發 queue_condition_ . notify_all (); }

並且透過queue_condition_喚醒保存

void Recorder :: doRecordSnapshotter () { ros :: NodeHandle nh ; while ( nh . ok () || ! queue_queue_ . empty ()) { boost :: unique_lock < boost :: mutex > lock ( queue_mutex_ ); while ( queue_queue_ . empty ()) { if ( ! nh . ok ()) return ; queue_condition_ . wait ( lock ); // 等待喚醒 } OutgoingQueue out_queue = queue_queue_ . front (); queue_queue_ . pop (); lock . release () -> unlock (); string target_filename = out_queue . filename ; string write_filename = target_filename + string ( ".active" ); try { bag_ . open ( write_filename , bagmode :: Write ); // 建立bag檔 } catch ( const rosbag :: BagException & ex ) { ROS_ERROR ( "Error writing: %s" , ex . what ()); return ; } while ( ! out_queue . queue -> empty ()) { OutgoingMessage out = out_queue . queue -> front (); out_queue . queue -> pop (); // 將訊息寫入bag檔 bag_ . write ( out . topic , out . time , * out . msg ); } // 關閉bag包 stopWriting (); } }

isSubscribed

判斷當前topic是否被訂閱,程式碼非常簡單,就是從已經訂閱的topic集合中尋找該topic,看是否能找到。

bool Recorder :: isSubscribed ( string const & topic ) const { // 從set數據結構中尋找是否有這個topic return currently_recording_ . find ( topic ) != currently_recording_ . end (); }

subscribe

訂閱指定的topic訊息,並且返回Subscriber

shared_ptr < ros :: Subscriber > Recorder :: subscribe ( string const & topic ) { ROS_INFO ( "Subscribing to %s" , topic . c_str ()); ros :: NodeHandle nh ; shared_ptr < int > count ( boost :: make_shared < int > ( options_ . limit )); shared_ptr < ros :: Subscriber > sub ( boost :: make_shared < ros :: Subscriber > ()); // 訂閱當前topic ros :: SubscribeOptions ops ; ops . topic = topic ; ops . queue_size = 100 ; ops . md5sum = ros :: message_traits :: md5sum < topic_tools :: ShapeShifter > (); ops . datatype = ros :: message_traits :: datatype < topic_tools :: ShapeShifter > (); ops . helper = boost :: make_shared < ros :: SubscriptionCallbackHelperT < const ros :: MessageEvent < topic_tools :: ShapeShifter const > &> > ( boost :: bind ( & Recorder :: doQueue , this , boost :: placeholders :: _1 , topic , sub , count )); ops . transport_hints = options_ . transport_hints ; * sub = nh . subscribe ( ops ); // 把當前topic放入訂閱集合,訂閱數量加1 currently_recording_ . insert ( topic ); num_subscribers_ ++ ; return sub ; }

doQueue

訂閱的訊息透過doQueue來放入佇列。

void Recorder :: doQueue ( const ros :: MessageEvent < topic_tools :: ShapeShifter const >& msg_event , string const & topic , shared_ptr < ros :: Subscriber > subscriber , shared_ptr < int > count ) { Time rectime = Time :: now (); if ( options_ . verbose ) cout << "Received message on topic " << subscriber -> getTopic () << endl ; OutgoingMessage out ( topic , msg_event . getMessage (), msg_event . getConnectionHeaderPtr (), rectime ); { boost :: mutex :: scoped_lock lock ( queue_mutex_ ); // 將訊息放入佇列,並且增加大小 queue_ -> push ( out ); queue_size_ += out . msg -> size (); // 訊息是否需要重復寫入 if ( options_ . repeat_latched ) { ros :: M_string :: const_iterator it = out . connection_header -> find ( "latching" ); if (( it != out . connection_header -> end ()) && ( it -> second == "1" )) { ros :: M_string :: const_iterator it2 = out . connection_header -> find ( "callerid" ); if ( it2 != out . connection_header -> end ()) { latched_msgs_ . insert ({{ subscriber -> getTopic (), it2 -> second }, out }); } } } // 檢查buffer是否滿了 while ( options_ . buffer_size > 0 && queue_size_ > options_ . buffer_size ) { OutgoingMessage drop = queue_ -> front (); queue_ -> pop (); queue_size_ -= drop . msg -> size (); if ( ! options_ . snapshot ) { Time now = Time :: now (); if ( now > last_buffer_warn_ + ros :: Duration ( 5.0 )) { ROS_WARN ( "rosbag record buffer exceeded. Dropping oldest queued message." ); last_buffer_warn_ = now ; } } } } // 喚醒bag寫入執行緒 if ( ! options_ . snapshot ) queue_condition_ . notify_all (); // 超過option計數的訊息則關閉 if (( * count ) > 0 ) { ( * count ) -- ; if (( * count ) == 0 ) { subscriber -> shutdown (); num_subscribers_ -- ; if ( num_subscribers_ == 0 ) ros :: shutdown (); } } }

run

主流程都集中在run中,啟動了另一個執行緒record_thread來進行錄制

int Recorder :: run () { ros :: NodeHandle nh ; if ( ! nh . ok ()) return 0 ; // 記錄開始錄制 if ( options_ . publish ) { pub_begin_write = nh . advertise < std_msgs :: String > ( "begin_write" , 1 , true ); } last_buffer_warn_ = Time (); queue_ = new std :: queue < OutgoingMessage > ; // 訂閱options中的topic if ( ! options_ . regex ) { for ( string const & topic : options_ . topics ) subscribe ( topic ); } ros :: Subscriber trigger_sub ; // Spin up a thread for writing to the file boost :: thread record_thread ; // 支持快照 if ( options_ . snapshot ) { record_thread = boost :: thread ( boost :: bind ( & Recorder :: doRecordSnapshotter , this )); // Subscribe to the snapshot trigger trigger_sub = nh . subscribe < std_msgs :: Empty > ( "snapshot_trigger" , 100 , boost :: bind ( & Recorder :: snapshotTrigger , this , boost :: placeholders :: _1 )); } else // 不支持快照,呼叫doRecord record_thread = boost :: thread ( boost :: bind ( & Recorder :: doRecord , this )); // 檢查節點 ros :: Timer check_master_timer ; if ( options_ . record_all || options_ . regex || ( options_ . node != std :: string ( "" ))) { // check for master first doCheckMaster ( ros :: TimerEvent (), nh ); check_master_timer = nh . createTimer ( ros :: Duration ( 1.0 ), boost :: bind ( & Recorder :: doCheckMaster , this , boost :: placeholders :: _1 , boost :: ref ( nh ))); } ros :: AsyncSpinner s ( 10 ); s . start (); record_thread . join (); queue_condition_ . notify_all (); delete queue_ ; return exit_code_ ; }

doRecord

讀取訊息,並且寫入bag檔,其中會檢查磁盤和是否超過最大間隔還沒接收訊息。

void Recorder :: doRecord () { // 開啟bag檔,等待寫入 startWriting (); // Schedule the disk space check warn_next_ = ros :: WallTime (); // 檢查磁盤 try { checkDisk (); } catch ( const rosbag :: BagException & ex ) { ROS_ERROR_STREAM ( ex . what ()); exit_code_ = 1 ; stopWriting (); return ; } check_disk_next_ = ros :: WallTime :: now () + ros :: WallDuration (). fromSec ( 20.0 ); // 迴圈讀取訊息並寫入,其中做了一些檢查 ros :: NodeHandle nh ; while ( nh . ok () || ! queue_ -> empty ()) { boost :: unique_lock < boost :: mutex > lock ( queue_mutex_ ); bool finished = false ; while ( queue_ -> empty ()) { boost :: xtime xt ; boost :: xtime_get ( & xt , boost :: TIME_UTC_ ); xt . nsec += 250000000 ; queue_condition_ . timed_wait ( lock , xt ); // 超過最大間隔沒有訊息則結束 if ( checkDuration ( ros :: Time :: now ())) { finished = true ; break ; } } if ( finished ) break ; OutgoingMessage out = queue_ -> front (); queue_ -> pop (); queue_size_ -= out . msg -> size (); lock . release () -> unlock (); if ( checkSize ()) break ; if ( checkDuration ( out . time )) break ; try { // 檢查磁盤和日誌,寫入訊息到bag if ( scheduledCheckDisk () && checkLogging ()) bag_ . write ( out . topic , out . time , * out . msg , out . connection_header ); } catch ( const rosbag :: BagException & ex ) { ROS_ERROR_STREAM ( ex . what ()); exit_code_ = 1 ; break ; } } stopWriting (); }

總結

ROS中透過 record.cpp 呼叫 Recorder 類來進行bag的保存。

ROS Bag分析

  • rosbag record代分碼析
  • rosbag play代分碼析
  • rosbag_storage總體介紹(一)
  • rosbag_storage bag實作(二)
  • rosbag_storage 查詢bag包(三)
  • rosbag_storage 檔讀寫(四)
  • ROS 釋出訂閱模式

  • ROS釋出訂閱實作(二)
  • ROS導航模組

  • ROS navigation導航模組(一)