ROS透過
rosbag record
命令進行bag包的錄制,下面我們分析下ROS是如何進行bag包錄制的。
目錄
rosbag record
的程式碼位於
ros_comm\tools\rosbag\src\recorder.cpp
中。實作的類主要為
Recorder
。
首先我們看下Recorder的建構函式,可以看到透過RecorderOptions選項來構造Recorder。
Recorder
::
Recorder
(
RecorderOptions
const
&
options
)
:
options_
(
options
),
\\
讀取選項
num_subscribers_
(
0
),
\\
訂閱數
exit_code_
(
0
),
queue_size_
(
0
),
split_count_
(
0
),
writing_enabled_
(
true
)
{
}
再看下RecorderOptions選項有哪些?
RecorderOptions
::
RecorderOptions
()
:
trigger
(
false
),
// 是否觸發
record_all
(
false
),
// 錄制所有topic
regex
(
false
),
do_exclude
(
false
),
quiet
(
false
),
append_date
(
true
),
snapshot
(
false
),
verbose
(
false
),
publish
(
false
),
compression
(
compression
::
Uncompressed
),
// 壓縮方式
prefix
(
""
),
// 字首
name
(
""
),
exclude_regex
(),
buffer_size
(
1048576
*
256
),
// 緩沖大小
chunk_size
(
1024
*
768
),
// chunk大小
limit
(
0
),
split
(
false
),
// 是否拆分
max_size
(
0
),
max_splits
(
0
),
max_duration
(
-
1.0
),
// 最大沒有接收到訊息的間隔(超過則結束)
node
(
""
),
// 錄制的node
min_space
(
1024
*
1024
*
1024
),
// 最小空間1G
min_space_str
(
"1G"
)
{
}
介面
了解了Recorder的配置之後,我們接下來分析Recorder的介面,一共有4個。總的來說, 錄制bag包的流程分為2個部份,一個流程訂閱訊息,並且放入佇列,另一個流程從佇列中讀取訊息,並且保存到bag檔中 。
public
:
Recorder
(
RecorderOptions
const
&
options
);
// 建構函式
void
doTrigger
();
// 觸發
bool
isSubscribed
(
std
::
string
const
&
topic
)
const
;
// 是否訂閱了topic
boost
::
shared_ptr
<
ros
::
Subscriber
>
subscribe
(
std
::
string
const
&
topic
);
// 訂閱topic
int
run
();
// 開始執行
下面開始逐個分析這4個介面。
doTrigger
透過觸發空訊息std_msgs::Empty來保存當前快照
void
Recorder
::
doTrigger
()
{
ros
::
NodeHandle
nh
;
ros
::
Publisher
pub
=
nh
.
advertise
<
std_msgs
::
Empty
>
(
"snapshot_trigger"
,
1
,
true
);
pub
.
publish
(
std_msgs
::
Empty
());
// 發送空訊息,snapshotTrigger接收這個訊息並呼叫
// 定時關閉ros
ros
::
Timer
terminate_timer
=
nh
.
createTimer
(
ros
::
Duration
(
1.0
),
boost
::
bind
(
&
ros
::
shutdown
));
ros
::
spin
();
}
發送的訊息std_msgs::Empty透過以下函數接收,
void
Recorder
::
snapshotTrigger
(
std_msgs
::
Empty
::
ConstPtr
trigger
)
{
(
void
)
trigger
;
updateFilenames
();
ROS_INFO
(
"Triggered snapshot recording with name '%s'."
,
target_filename_
.
c_str
());
// 更新佇列
{
boost
::
mutex
::
scoped_lock
lock
(
queue_mutex_
);
queue_queue_
.
push
(
OutgoingQueue
(
target_filename_
,
queue_
,
Time
::
now
()));
queue_
=
new
std
::
queue
<
OutgoingMessage
>
;
queue_size_
=
0
;
}
// 觸發
queue_condition_
.
notify_all
();
}
並且透過queue_condition_喚醒保存
void
Recorder
::
doRecordSnapshotter
()
{
ros
::
NodeHandle
nh
;
while
(
nh
.
ok
()
||
!
queue_queue_
.
empty
())
{
boost
::
unique_lock
<
boost
::
mutex
>
lock
(
queue_mutex_
);
while
(
queue_queue_
.
empty
())
{
if
(
!
nh
.
ok
())
return
;
queue_condition_
.
wait
(
lock
);
// 等待喚醒
}
OutgoingQueue
out_queue
=
queue_queue_
.
front
();
queue_queue_
.
pop
();
lock
.
release
()
->
unlock
();
string
target_filename
=
out_queue
.
filename
;
string
write_filename
=
target_filename
+
string
(
".active"
);
try
{
bag_
.
open
(
write_filename
,
bagmode
::
Write
);
// 建立bag檔
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR
(
"Error writing: %s"
,
ex
.
what
());
return
;
}
while
(
!
out_queue
.
queue
->
empty
())
{
OutgoingMessage
out
=
out_queue
.
queue
->
front
();
out_queue
.
queue
->
pop
();
// 將訊息寫入bag檔
bag_
.
write
(
out
.
topic
,
out
.
time
,
*
out
.
msg
);
}
// 關閉bag包
stopWriting
();
}
}
isSubscribed
判斷當前topic是否被訂閱,程式碼非常簡單,就是從已經訂閱的topic集合中尋找該topic,看是否能找到。
bool
Recorder
::
isSubscribed
(
string
const
&
topic
)
const
{
// 從set數據結構中尋找是否有這個topic
return
currently_recording_
.
find
(
topic
)
!=
currently_recording_
.
end
();
}
subscribe
訂閱指定的topic訊息,並且返回Subscriber
shared_ptr
<
ros
::
Subscriber
>
Recorder
::
subscribe
(
string
const
&
topic
)
{
ROS_INFO
(
"Subscribing to %s"
,
topic
.
c_str
());
ros
::
NodeHandle
nh
;
shared_ptr
<
int
>
count
(
boost
::
make_shared
<
int
>
(
options_
.
limit
));
shared_ptr
<
ros
::
Subscriber
>
sub
(
boost
::
make_shared
<
ros
::
Subscriber
>
());
// 訂閱當前topic
ros
::
SubscribeOptions
ops
;
ops
.
topic
=
topic
;
ops
.
queue_size
=
100
;
ops
.
md5sum
=
ros
::
message_traits
::
md5sum
<
topic_tools
::
ShapeShifter
>
();
ops
.
datatype
=
ros
::
message_traits
::
datatype
<
topic_tools
::
ShapeShifter
>
();
ops
.
helper
=
boost
::
make_shared
<
ros
::
SubscriptionCallbackHelperT
<
const
ros
::
MessageEvent
<
topic_tools
::
ShapeShifter
const
>
&>
>
(
boost
::
bind
(
&
Recorder
::
doQueue
,
this
,
boost
::
placeholders
::
_1
,
topic
,
sub
,
count
));
ops
.
transport_hints
=
options_
.
transport_hints
;
*
sub
=
nh
.
subscribe
(
ops
);
// 把當前topic放入訂閱集合,訂閱數量加1
currently_recording_
.
insert
(
topic
);
num_subscribers_
++
;
return
sub
;
}
doQueue
訂閱的訊息透過doQueue來放入佇列。
void
Recorder
::
doQueue
(
const
ros
::
MessageEvent
<
topic_tools
::
ShapeShifter
const
>&
msg_event
,
string
const
&
topic
,
shared_ptr
<
ros
::
Subscriber
>
subscriber
,
shared_ptr
<
int
>
count
)
{
Time
rectime
=
Time
::
now
();
if
(
options_
.
verbose
)
cout
<<
"Received message on topic "
<<
subscriber
->
getTopic
()
<<
endl
;
OutgoingMessage
out
(
topic
,
msg_event
.
getMessage
(),
msg_event
.
getConnectionHeaderPtr
(),
rectime
);
{
boost
::
mutex
::
scoped_lock
lock
(
queue_mutex_
);
// 將訊息放入佇列,並且增加大小
queue_
->
push
(
out
);
queue_size_
+=
out
.
msg
->
size
();
// 訊息是否需要重復寫入
if
(
options_
.
repeat_latched
)
{
ros
::
M_string
::
const_iterator
it
=
out
.
connection_header
->
find
(
"latching"
);
if
((
it
!=
out
.
connection_header
->
end
())
&&
(
it
->
second
==
"1"
))
{
ros
::
M_string
::
const_iterator
it2
=
out
.
connection_header
->
find
(
"callerid"
);
if
(
it2
!=
out
.
connection_header
->
end
())
{
latched_msgs_
.
insert
({{
subscriber
->
getTopic
(),
it2
->
second
},
out
});
}
}
}
// 檢查buffer是否滿了
while
(
options_
.
buffer_size
>
0
&&
queue_size_
>
options_
.
buffer_size
)
{
OutgoingMessage
drop
=
queue_
->
front
();
queue_
->
pop
();
queue_size_
-=
drop
.
msg
->
size
();
if
(
!
options_
.
snapshot
)
{
Time
now
=
Time
::
now
();
if
(
now
>
last_buffer_warn_
+
ros
::
Duration
(
5.0
))
{
ROS_WARN
(
"rosbag record buffer exceeded. Dropping oldest queued message."
);
last_buffer_warn_
=
now
;
}
}
}
}
// 喚醒bag寫入執行緒
if
(
!
options_
.
snapshot
)
queue_condition_
.
notify_all
();
// 超過option計數的訊息則關閉
if
((
*
count
)
>
0
)
{
(
*
count
)
--
;
if
((
*
count
)
==
0
)
{
subscriber
->
shutdown
();
num_subscribers_
--
;
if
(
num_subscribers_
==
0
)
ros
::
shutdown
();
}
}
}
run
主流程都集中在run中,啟動了另一個執行緒record_thread來進行錄制
int
Recorder
::
run
()
{
ros
::
NodeHandle
nh
;
if
(
!
nh
.
ok
())
return
0
;
// 記錄開始錄制
if
(
options_
.
publish
)
{
pub_begin_write
=
nh
.
advertise
<
std_msgs
::
String
>
(
"begin_write"
,
1
,
true
);
}
last_buffer_warn_
=
Time
();
queue_
=
new
std
::
queue
<
OutgoingMessage
>
;
// 訂閱options中的topic
if
(
!
options_
.
regex
)
{
for
(
string
const
&
topic
:
options_
.
topics
)
subscribe
(
topic
);
}
ros
::
Subscriber
trigger_sub
;
// Spin up a thread for writing to the file
boost
::
thread
record_thread
;
// 支持快照
if
(
options_
.
snapshot
)
{
record_thread
=
boost
::
thread
(
boost
::
bind
(
&
Recorder
::
doRecordSnapshotter
,
this
));
// Subscribe to the snapshot trigger
trigger_sub
=
nh
.
subscribe
<
std_msgs
::
Empty
>
(
"snapshot_trigger"
,
100
,
boost
::
bind
(
&
Recorder
::
snapshotTrigger
,
this
,
boost
::
placeholders
::
_1
));
}
else
// 不支持快照,呼叫doRecord
record_thread
=
boost
::
thread
(
boost
::
bind
(
&
Recorder
::
doRecord
,
this
));
// 檢查節點
ros
::
Timer
check_master_timer
;
if
(
options_
.
record_all
||
options_
.
regex
||
(
options_
.
node
!=
std
::
string
(
""
)))
{
// check for master first
doCheckMaster
(
ros
::
TimerEvent
(),
nh
);
check_master_timer
=
nh
.
createTimer
(
ros
::
Duration
(
1.0
),
boost
::
bind
(
&
Recorder
::
doCheckMaster
,
this
,
boost
::
placeholders
::
_1
,
boost
::
ref
(
nh
)));
}
ros
::
AsyncSpinner
s
(
10
);
s
.
start
();
record_thread
.
join
();
queue_condition_
.
notify_all
();
delete
queue_
;
return
exit_code_
;
}
doRecord
讀取訊息,並且寫入bag檔,其中會檢查磁盤和是否超過最大間隔還沒接收訊息。
void
Recorder
::
doRecord
()
{
// 開啟bag檔,等待寫入
startWriting
();
// Schedule the disk space check
warn_next_
=
ros
::
WallTime
();
// 檢查磁盤
try
{
checkDisk
();
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR_STREAM
(
ex
.
what
());
exit_code_
=
1
;
stopWriting
();
return
;
}
check_disk_next_
=
ros
::
WallTime
::
now
()
+
ros
::
WallDuration
().
fromSec
(
20.0
);
// 迴圈讀取訊息並寫入,其中做了一些檢查
ros
::
NodeHandle
nh
;
while
(
nh
.
ok
()
||
!
queue_
->
empty
())
{
boost
::
unique_lock
<
boost
::
mutex
>
lock
(
queue_mutex_
);
bool
finished
=
false
;
while
(
queue_
->
empty
())
{
boost
::
xtime
xt
;
boost
::
xtime_get
(
&
xt
,
boost
::
TIME_UTC_
);
xt
.
nsec
+=
250000000
;
queue_condition_
.
timed_wait
(
lock
,
xt
);
// 超過最大間隔沒有訊息則結束
if
(
checkDuration
(
ros
::
Time
::
now
()))
{
finished
=
true
;
break
;
}
}
if
(
finished
)
break
;
OutgoingMessage
out
=
queue_
->
front
();
queue_
->
pop
();
queue_size_
-=
out
.
msg
->
size
();
lock
.
release
()
->
unlock
();
if
(
checkSize
())
break
;
if
(
checkDuration
(
out
.
time
))
break
;
try
{
// 檢查磁盤和日誌,寫入訊息到bag
if
(
scheduledCheckDisk
()
&&
checkLogging
())
bag_
.
write
(
out
.
topic
,
out
.
time
,
*
out
.
msg
,
out
.
connection_header
);
}
catch
(
const
rosbag
::
BagException
&
ex
)
{
ROS_ERROR_STREAM
(
ex
.
what
());
exit_code_
=
1
;
break
;
}
}
stopWriting
();
}
總結
ROS中透過
record.cpp
呼叫
Recorder
類來進行bag的保存。
ROS Bag分析
ROS 釋出訂閱模式
ROS導航模組