当前位置: 华文星空 > 知识

rosbag play代码分析

2022-02-06知识

ROS通过 rosbag play 命令进行bag包的播放,下面我们分析下ROS是如何进行bag包播放的。

目录

rosbag play 的代码位于 ros_commtools\rosbag\src\player.cpp 中,实现的类主要为 Player

首先我们看下Player的构造函数,可以看到通过PlayerOptions选项来构造Player。

Player :: Player ( PlayerOptions const & options ) : options_ ( options ), // 选项 paused_ ( false ), // If we were given a list of topics to pause on, then go into that mode // by default (it can be toggled later via 't' from the keyboard). pause_for_topics_ ( options_ . pause_topics . size () > 0 ), pause_change_requested_ ( false ), requested_pause_state_ ( false ), terminal_modified_ ( false ) { // 订阅暂停服务 ros :: NodeHandle private_node_handle ( "~" ); pause_service_ = private_node_handle . advertiseService ( "pause_playback" , & Player :: pauseCallback , this ); }

Player的选项如下

PlayerOptions::PlayerOptions() : prefix(""), quiet(false), start_paused(false), at_once(false), bag_time(false), bag_time_frequency(0.0), time_scale(1.0), // 时间尺度 queue_size(0), // 队列大小 advertise_sleep(0.2), try_future(false), has_time(false), loop(false), // 循环播放 time(0.0f), has_duration(false), duration(0.0f), keep_alive(false), wait_for_subscribers(false), rate_control_topic(""), rate_control_max_delay(1.0f), skip_empty(ros::DURATION_MAX) { }

接口

Player的接口只有一个就是 publish 接口,由于函数较长,我们将分步骤分析。

读取bag包

根据options中的bag包名称,读取bag包

// 遍历输入的所有bag for ( string const & filename : options_ . bags ) { ROS_INFO ( "Opening %s" , filename . c_str ()); try { // 读取bag包 shared_ptr < Bag > bag ( boost :: make_shared < Bag > ()); bag -> open ( filename , bagmode :: Read ); bags_ . push_back ( bag ); } catch ( const BagUnindexedException & ex ) { std :: cerr << "Bag file " << filename << " is unindexed. Run rosbag reindex." << std :: endl ; return ; } }

注册发布

查找开始时间到结束时间内的消息并且注册发布。

// Publish all messages in the bags View full_view ; for ( shared_ptr < Bag >& bag : bags_ ) full_view . addQuery ( * bag ); const auto full_initial_time = full_view . getBeginTime (); const auto initial_time = full_initial_time + ros :: Duration ( options_ . time ); ros :: Time finish_time = ros :: TIME_MAX ; if ( options_ . has_duration ) { finish_time = initial_time + ros :: Duration ( options_ . duration ); } View view ; TopicQuery topics ( options_ . topics ); if ( options_ . topics . empty ()) { for ( shared_ptr < Bag >& bag : bags_ ) view . addQuery ( * bag , initial_time , finish_time ); } else { for ( shared_ptr < Bag >& bag : bags_ ) view . addQuery ( * bag , topics , initial_time , finish_time ); } // 添加topic到publishers_ for ( const ConnectionInfo * c : view . getConnections ()) { advertise ( c ); }

发布时间和消息

在循环中发布时间和消息。

while ( true ) { // 发布时间 time_translator_ . setTimeScale ( options_ . time_scale ); start_time_ = view . begin () -> getTime (); time_translator_ . setRealStartTime ( start_time_ ); bag_length_ = view . getEndTime () - view . getBeginTime (); // Set the last rate control to now, so the program doesn't start delayed. last_rate_control_ = start_time_ ; time_publisher_ . setTime ( start_time_ ); ros :: WallTime now_wt = ros :: WallTime :: now (); time_translator_ . setTranslatedStartTime ( ros :: Time ( now_wt . sec , now_wt . nsec )); time_publisher_ . setTimeScale ( options_ . time_scale ); if ( options_ . bag_time ) time_publisher_ . setPublishFrequency ( options_ . bag_time_frequency ); else time_publisher_ . setPublishFrequency ( - 1.0 ); paused_time_ = now_wt ; // 发布消息 for ( const MessageInstance & m : view ) { if ( ! node_handle_ . ok ()) break ; doPublish ( m ); } // 是否一致保存命令行 if ( options_ . keep_alive ) while ( node_handle_ . ok ()) doKeepAlive (); // 是否循环 if ( ! options_ . loop ) { std :: cout << std :: endl << "Done." << std :: endl ; break ; } }

发布消息

上一步中在循环所有的topic,通过doPublish发布消息。

while (( paused_ || delayed_ || ! time_publisher_ . horizonReached ()) && node_handle_ . ok ()) { bool charsleftorpaused = true ; while ( charsleftorpaused && node_handle_ . ok ()) { ros :: spinOnce (); if ( pause_change_requested_ ) { processPause ( requested_pause_state_ , horizon ); pause_change_requested_ = false ; } switch ( readCharFromStdin ()){ // 1. 空格暂停 case ' ' : processPause ( ! paused_ , horizon ); break ; // 2. 单步执行 case 's' : if ( paused_ ) { time_publisher_ . stepClock (); ros :: WallDuration shift = ros :: WallTime :: now () - horizon ; paused_time_ = ros :: WallTime :: now (); time_translator_ . shift ( ros :: Duration ( shift . sec , shift . nsec )); horizon += shift ; time_publisher_ . setWCHorizon ( horizon ); ( pub_iter -> second ). publish ( m ); printTime (); return ; } break ; case 't' : pause_for_topics_ = ! pause_for_topics_ ; break ; // 文件结尾,看是暂停还是延迟做时间补偿 case EOF : if ( paused_ ) { printTime (); time_publisher_ . runStalledClock ( ros :: WallDuration ( .1 )); ros :: spinOnce (); } else if ( delayed_ ) { printTime (); time_publisher_ . runStalledClock ( ros :: WallDuration ( .1 )); ros :: spinOnce (); // You need to check the rate here too. if ( rate_control_sub_ == NULL || ( time_publisher_ . getTime () - last_rate_control_ ). toSec () <= options_ . rate_control_max_delay ) { delayed_ = false ; // Make sure time doesn't shift after leaving delay. ros :: WallDuration shift = ros :: WallTime :: now () - paused_time_ ; paused_time_ = ros :: WallTime :: now (); time_translator_ . shift ( ros :: Duration ( shift . sec , shift . nsec )); horizon += shift ; time_publisher_ . setWCHorizon ( horizon ); } } else charsleftorpaused = false ; } } printTime (); time_publisher_ . runClock ( ros :: WallDuration ( .1 )); ros :: spinOnce (); }

总结

ROS中通过 play.cpp 调用 Player 类来读取bag包,并且发布其中的消息。

ROS Bag分析

  • rosbag record代码分析
  • rosbag play代码分析
  • rosbag_storage总体介绍(一)
  • rosbag_storage bag实现(二)
  • rosbag_storage 查询bag包(三)
  • rosbag_storage 文件读写(四)
  • ROS 发布订阅模式

  • ROS发布订阅实现(二)
  • ROS导航模块

  • ROS navigation导航模块(一)