MCAP 流式加载
MCAP 流式加载
MCAP 的流式加载需要添加 Chunk Index。利用 MCAP Summary 中的 Chunk Index,可以按 Topic 和 message.log_time 时间范围定位 Chunk,再在命中的 Chunk 中逐条产出 Message。适合从大文件中读取少量 Topic 或小时间窗口。
索引定位的粒度是 Chunk,不是单条 Message。命中的 Chunk 将被读取和解压,然后 Reader 再筛选其中的 Message。
安装 CLI
可参考官方文档:CLI | MCAP。
从 foxglove/mcap releases 下载对应的 mcap-linux-* 二进制文件,放入 PATH 中的目录。以下以 ~/.local/bin 为例:
mkdir -p ~/.local/bin
mv mcap-linux-amd64 ~/.local/bin/mcap
chmod +x ~/.local/bin/mcap如果 ~/.local/bin 不在 PATH 中,按当前 Shell 添加配置:
# bash
echo 'export PATH="$HOME/.local/bin:$PATH"
' >> ~/.bashrc
source ~/.bashrc
# zsh
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.zshrc
source ~/.zshrc接下来安装 Python 包:
python -m pip install mcap mcap-ros2-support说明:
mcap提供 MCAP Reader。
mcap-ros2-support用于解码 ROS2 Message。
检查 MCAP 是否有 Chunk Index
方式 1 - 使用 mcap info 命令
mcap info <你的文件.mcap>判断标准:
- 有 Chunk Index:输出中显示
compression和chunks部分,包含[X/Y chunks]信息。
- 无 Chunk Index:输出中没有
chunks相关信息。
方式 2 - 使用 Python 脚本
将以下脚本保存为 check_mcap_index.py:
import argparse
from mcap.reader import make_reader
def has_chunk_index(mcap_path: str) -> bool:
with open(mcap_path, "rb") as f:
reader = make_reader(f)
summary = reader.get_summary()
return bool(summary and summary.chunk_indexes)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("mcap")
args = parser.parse_args()
if has_chunk_index(args.mcap):
print("OK: MCAP has chunk index")
else:
print("ERROR: MCAP has no chunk index")执行:
python check_mcap_index.py input.mcap- 输出
OK表示可以基于索引按 Topic 或时间范围流式加载。
- 输出
ERROR则应生成带索引的新 MCAP 文件。
生成带索引 MCAP
使用 MCAP CLI 生成新的带索引文件:
mcap compress input.mcap -o input.indexed.mcap \
--chunk-size 4194304 \
--compression zstd参数说明:
input.mcap:原始 MCAP 文件。
input.indexed.mcap:输出文件。
-chunk-size 4194304:按 4 MB 左右重新分 Chunk。这里使用 MCAP CLI 默认值作为示例。
-compression zstd:输出 Chunk 使用zstd压缩。
如果从损坏的 MCAP 中恢复数据,可以使用 recover:
mcap recover input.mcap -o input.indexed.mcap \
--chunk-size 4194304 \
--compression zstd \
--always-decode-chunk重写、压缩、重新分 Chunk 时,优先使用 compress。recover 的语义是从可能损坏的 MCAP 文件中恢复数据。
基于索引的范围流式加载示例
基于索引的范围流式加载适合读取指定 Topic 或指定时间窗口。为让 Reader 利用 Chunk Index 跳过无关 Chunk,应将 topics、start_time、end_time 直接传给 reader.iter_decoded_messages()。
from collections.abc import Iterable
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
TOPICS = [
"/robot/topic/rdap/front_color_sensor_camera/h264",
"/robot/topic/rdap/h264_front_left_sensor_camera",
"/robot/topic/rdap/h264_front_right_sensor_camera",
"/teleop/sync_state",
]
def stream_indexed_range(
mcap_path: str,
topics: Iterable[str] = TOPICS,
start_time_ns: int | None = None,
end_time_ns: int | None = None,
) -> None:
with open(mcap_path, "rb") as f:
reader = make_reader(f, decoder_factories=[DecoderFactory()])
# 不传 topics 时,默认读取全部 Topic;
# 不传 start_time 时,从匹配条件的最早消息开始读。
# 不传 end_time 时,一直读到匹配条件的最后消息。
for schema, channel, message, ros_msg in reader.iter_decoded_messages(
topics=topics,
start_time=start_time_ns,
end_time=end_time_ns,
log_time_order=True,
):
handle_message(schema, channel, message, ros_msg)
def handle_message(schema, channel, message, ros_msg) -> None:
print(channel.topic, message.log_time, flush=True)
# stream_indexed_range("input.indexed.mcap", TOPICS, start_time_ns=1780106598349765958, end_time_ns=1780106698349765958)说明:
start_time和end_time使用 MCAPmessage.log_time,单位是纳秒。
end_time是排他边界,message.log_time >= end_time的消息不被产出。
log_time_order=True将按log_time排序产出 Message;如果只需要顺序扫描,可使用False。