ROS通过
rosbag record
命令进行bag包的录制,下面我们分析下ROS是如何进行bag包录制的。
目录
rosbag record
的代码位于
ros_comm\tools\rosbag\src\recorder.cpp
中。实现的类主要为
Recorder
。
首先我们看下Recorder的构造函数,可以看到通过RecorderOptions选项来构造Recorder。
Recorder
::
Recorder
(
RecorderOptions
const
&
options
)
:
options_
(
options
),
\\
读取选项
num_subscribers_
(
0
),
\\
订阅数
exit_code_
(
0
),
queue_size_
(
0
),
split_count_
(
0
),
writing_enabled_
(
true
)
{
}
再看下RecorderOptions选项有哪些?
RecorderOptions
::
RecorderOptions
()
:
trigger
(
false
),
// 是否触发
record_all
(
false
),
// 录制所有topic
regex
(
false
),
do_exclude
(
false
),
quiet
(
false
),
append_date
(
true
),
snapshot
(
false
),
verbose
(
false
),
publish
(
false
),
compression
(
compression
::
Uncompressed
),
// 压缩方式
prefix
(
""
),
// 前缀
name
(
""
),
exclude_regex
(),
buffer_size
(
1048576
*
256
),
// 缓冲大小
chunk_size
(
1024
*
768
),
// chunk大小
limit
(
0
),
split
(
false
),
// 是否拆分
max_size
(
0
),
max_splits
(
0
),
max_duration
(
-
1.0
),
// 最大没有接收到消息的间隔(超过则退出)
node
(
""
),
// 录制的node
min_space
(
1024
*
1024
*
1024
),
// 最小空间1G
min_space_str
(
"1G"
)
{
}
接口
了解了Recorder的配置之后,我们接下来分析Recorder的接口,一共有4个。总的来说, 录制bag包的流程分为2个部分,一个流程订阅消息,并且放入队列,另一个流程从队列中读取消息,并且保存到bag文件中 。
public
:
Recorder
(
RecorderOptions
const
&
options
);
// 构造函数
void
doTrigger
();
// 触发
bool
isSubscribed
(
std
::
string
const
&
topic
)
const
;
// 是否订阅了topic
boost
::
shared_ptr
<
ros
::
Subscriber
>
subscribe
(
std
::
string
const
&
topic
);
// 订阅topic
int
run
();
// 开始运行
下面开始逐个分析这4个接口。
doTrigger
通过触发空消息std_msgs::Empty来保存当前快照
void
Recorder
::
doTrigger
()
{
ros
::
NodeHandle
nh
;
ros
::
Publisher
pub
=
nh
.
advertise
<
std_msgs
::
Empty
>
(
"snapshot_trigger"
,
1
,
true
);
pub
.
publish
(
std_msgs
::
Empty
());
// 发送空消息,snapshotTrigger接收这个消息并调用
// 定时关闭ros
ros
::
Timer
terminate_timer
=
nh
.
createTimer
(
ros
::
Duration
(
1.0
),
boost
::
bind
(
&
ros
::
shutdown
));
ros
::
spin
();
}
发送的消息std_msgs::Empty通过以下函数接收,
void
Recorder
::
snapshotTrigger
(
std_msgs
::
Empty
::
ConstPtr
trigger
)
{
(
void
)
trigger
;
updateFilenames
();
ROS_INFO
(
"Triggered snapshot recording with name '%s'."
,
target_filename_
.
c_str
());
// 更新队列
{
boost
::
mutex
::
scoped_lock
lock
(
queue_mutex_
);
queue_queue_
.
push
(
OutgoingQueue
(
target_filename_
,
queue_
,
Time
::
now
()));
queue_
=
new
std
::
queue
<
OutgoingMessage
>
;
queue_size_
=
0
;
}
// 触发
queue_condition_
.
notify_all
();
}
并且通过queue_condition_唤醒保存
void
Recorder
::
doRecordSnapshotter
()
{
ros
::
NodeHandle
nh
;
while
(
nh
.
ok
()
||
!
queue_queue_
.
empty
())
{
boost
::
unique_lock
<
boost
::
mutex
>
lock
(
queue_mutex_
);
while
(
queue_queue_
.
empty
())
{
if
(
!
nh
.
ok
())
return
;
queue_condition_
.
wait
(
lock
);
// 等待唤醒
}
OutgoingQueue
out_queue
=
queue_queue_
.
front
();
queue_queue_
.
pop
();
lock
.
release
()
->
unlock
();
string
target_filename
=
out_queue
.
filename
;
string
write_filename
=
target_filename
+
string
(
".active"
);
try
{
bag_
.
open
(
write_filename
,
bagmode
::
Write
);
// 创建bag文件
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR
(
"Error writing: %s"
,
ex
.
what
());
return
;
}
while
(
!
out_queue
.
queue
->
empty
())
{
OutgoingMessage
out
=
out_queue
.
queue
->
front
();
out_queue
.
queue
->
pop
();
// 将消息写入bag文件
bag_
.
write
(
out
.
topic
,
out
.
time
,
*
out
.
msg
);
}
// 关闭bag包
stopWriting
();
}
}
isSubscribed
判断当前topic是否被订阅,代码非常简单,就是从已经订阅的topic集合中查找该topic,看是否能找到。
bool
Recorder
::
isSubscribed
(
string
const
&
topic
)
const
{
// 从set数据结构中查找是否有这个topic
return
currently_recording_
.
find
(
topic
)
!=
currently_recording_
.
end
();
}
subscribe
订阅指定的topic消息,并且返回Subscriber
shared_ptr
<
ros
::
Subscriber
>
Recorder
::
subscribe
(
string
const
&
topic
)
{
ROS_INFO
(
"Subscribing to %s"
,
topic
.
c_str
());
ros
::
NodeHandle
nh
;
shared_ptr
<
int
>
count
(
boost
::
make_shared
<
int
>
(
options_
.
limit
));
shared_ptr
<
ros
::
Subscriber
>
sub
(
boost
::
make_shared
<
ros
::
Subscriber
>
());
// 订阅当前topic
ros
::
SubscribeOptions
ops
;
ops
.
topic
=
topic
;
ops
.
queue_size
=
100
;
ops
.
md5sum
=
ros
::
message_traits
::
md5sum
<
topic_tools
::
ShapeShifter
>
();
ops
.
datatype
=
ros
::
message_traits
::
datatype
<
topic_tools
::
ShapeShifter
>
();
ops
.
helper
=
boost
::
make_shared
<
ros
::
SubscriptionCallbackHelperT
<
const
ros
::
MessageEvent
<
topic_tools
::
ShapeShifter
const
>
&>
>
(
boost
::
bind
(
&
Recorder
::
doQueue
,
this
,
boost
::
placeholders
::
_1
,
topic
,
sub
,
count
));
ops
.
transport_hints
=
options_
.
transport_hints
;
*
sub
=
nh
.
subscribe
(
ops
);
// 把当前topic放入订阅集合,订阅数量加1
currently_recording_
.
insert
(
topic
);
num_subscribers_
++
;
return
sub
;
}
doQueue
订阅的消息通过doQueue来放入队列。
void
Recorder
::
doQueue
(
const
ros
::
MessageEvent
<
topic_tools
::
ShapeShifter
const
>&
msg_event
,
string
const
&
topic
,
shared_ptr
<
ros
::
Subscriber
>
subscriber
,
shared_ptr
<
int
>
count
)
{
Time
rectime
=
Time
::
now
();
if
(
options_
.
verbose
)
cout
<<
"Received message on topic "
<<
subscriber
->
getTopic
()
<<
endl
;
OutgoingMessage
out
(
topic
,
msg_event
.
getMessage
(),
msg_event
.
getConnectionHeaderPtr
(),
rectime
);
{
boost
::
mutex
::
scoped_lock
lock
(
queue_mutex_
);
// 将消息放入队列,并且增加大小
queue_
->
push
(
out
);
queue_size_
+=
out
.
msg
->
size
();
// 消息是否需要重复写入
if
(
options_
.
repeat_latched
)
{
ros
::
M_string
::
const_iterator
it
=
out
.
connection_header
->
find
(
"latching"
);
if
((
it
!=
out
.
connection_header
->
end
())
&&
(
it
->
second
==
"1"
))
{
ros
::
M_string
::
const_iterator
it2
=
out
.
connection_header
->
find
(
"callerid"
);
if
(
it2
!=
out
.
connection_header
->
end
())
{
latched_msgs_
.
insert
({{
subscriber
->
getTopic
(),
it2
->
second
},
out
});
}
}
}
// 检查buffer是否满了
while
(
options_
.
buffer_size
>
0
&&
queue_size_
>
options_
.
buffer_size
)
{
OutgoingMessage
drop
=
queue_
->
front
();
queue_
->
pop
();
queue_size_
-=
drop
.
msg
->
size
();
if
(
!
options_
.
snapshot
)
{
Time
now
=
Time
::
now
();
if
(
now
>
last_buffer_warn_
+
ros
::
Duration
(
5.0
))
{
ROS_WARN
(
"rosbag record buffer exceeded. Dropping oldest queued message."
);
last_buffer_warn_
=
now
;
}
}
}
}
// 唤醒bag写入线程
if
(
!
options_
.
snapshot
)
queue_condition_
.
notify_all
();
// 超过option计数的消息则关闭
if
((
*
count
)
>
0
)
{
(
*
count
)
--
;
if
((
*
count
)
==
0
)
{
subscriber
->
shutdown
();
num_subscribers_
--
;
if
(
num_subscribers_
==
0
)
ros
::
shutdown
();
}
}
}
run
主流程都集中在run中,启动了另一个线程record_thread来进行录制
int
Recorder
::
run
()
{
ros
::
NodeHandle
nh
;
if
(
!
nh
.
ok
())
return
0
;
// 记录开始录制
if
(
options_
.
publish
)
{
pub_begin_write
=
nh
.
advertise
<
std_msgs
::
String
>
(
"begin_write"
,
1
,
true
);
}
last_buffer_warn_
=
Time
();
queue_
=
new
std
::
queue
<
OutgoingMessage
>
;
// 订阅options中的topic
if
(
!
options_
.
regex
)
{
for
(
string
const
&
topic
:
options_
.
topics
)
subscribe
(
topic
);
}
ros
::
Subscriber
trigger_sub
;
// Spin up a thread for writing to the file
boost
::
thread
record_thread
;
// 支持快照
if
(
options_
.
snapshot
)
{
record_thread
=
boost
::
thread
(
boost
::
bind
(
&
Recorder
::
doRecordSnapshotter
,
this
));
// Subscribe to the snapshot trigger
trigger_sub
=
nh
.
subscribe
<
std_msgs
::
Empty
>
(
"snapshot_trigger"
,
100
,
boost
::
bind
(
&
Recorder
::
snapshotTrigger
,
this
,
boost
::
placeholders
::
_1
));
}
else
// 不支持快照,调用doRecord
record_thread
=
boost
::
thread
(
boost
::
bind
(
&
Recorder
::
doRecord
,
this
));
// 检查节点
ros
::
Timer
check_master_timer
;
if
(
options_
.
record_all
||
options_
.
regex
||
(
options_
.
node
!=
std
::
string
(
""
)))
{
// check for master first
doCheckMaster
(
ros
::
TimerEvent
(),
nh
);
check_master_timer
=
nh
.
createTimer
(
ros
::
Duration
(
1.0
),
boost
::
bind
(
&
Recorder
::
doCheckMaster
,
this
,
boost
::
placeholders
::
_1
,
boost
::
ref
(
nh
)));
}
ros
::
AsyncSpinner
s
(
10
);
s
.
start
();
record_thread
.
join
();
queue_condition_
.
notify_all
();
delete
queue_
;
return
exit_code_
;
}
doRecord
读取消息,并且写入bag文件,其中会检查磁盘和是否超过最大间隔还没接收消息。
void
Recorder
::
doRecord
()
{
// 打开bag文件,等待写入
startWriting
();
// Schedule the disk space check
warn_next_
=
ros
::
WallTime
();
// 检查磁盘
try
{
checkDisk
();
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR_STREAM
(
ex
.
what
());
exit_code_
=
1
;
stopWriting
();
return
;
}
check_disk_next_
=
ros
::
WallTime
::
now
()
+
ros
::
WallDuration
().
fromSec
(
20.0
);
// 循环读取消息并写入,其中做了一些检查
ros
::
NodeHandle
nh
;
while
(
nh
.
ok
()
||
!
queue_
->
empty
())
{
boost
::
unique_lock
<
boost
::
mutex
>
lock
(
queue_mutex_
);
bool
finished
=
false
;
while
(
queue_
->
empty
())
{
boost
::
xtime
xt
;
boost
::
xtime_get
(
&
xt
,
boost
::
TIME_UTC_
);
xt
.
nsec
+=
250000000
;
queue_condition_
.
timed_wait
(
lock
,
xt
);
// 超过最大间隔没有消息则退出
if
(
checkDuration
(
ros
::
Time
::
now
()))
{
finished
=
true
;
break
;
}
}
if
(
finished
)
break
;
OutgoingMessage
out
=
queue_
->
front
();
queue_
->
pop
();
queue_size_
-=
out
.
msg
->
size
();
lock
.
release
()
->
unlock
();
if
(
checkSize
())
break
;
if
(
checkDuration
(
out
.
time
))
break
;
try
{
// 检查磁盘和日志,写入消息到bag
if
(
scheduledCheckDisk
()
&&
checkLogging
())
bag_
.
write
(
out
.
topic
,
out
.
time
,
*
out
.
msg
,
out
.
connection_header
);
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR_STREAM
(
ex
.
what
());
exit_code_
=
1
;
break
;
}
}
stopWriting
();
}
总结
ROS中通过
record.cpp
调用
Recorder
类来进行bag的保存。
ROS Bag分析
ROS 发布订阅模式
ROS导航模块