MCAP 流式加载

MCAP 流式加载

MCAP 的流式加载需要添加 Chunk Index。利用 MCAP Summary 中的 Chunk Index,可以按 Topicmessage.log_time 时间范围定位 Chunk,再在命中的 Chunk 中逐条产出 Message。适合从大文件中读取少量 Topic 或小时间窗口。

索引定位的粒度是 Chunk,不是单条 Message。命中的 Chunk 将被读取和解压,然后 Reader 再筛选其中的 Message。


安装 CLI

可参考官方文档:CLI | MCAP

foxglove/mcap releases 下载对应的 mcap-linux-* 二进制文件,放入 PATH 中的目录。以下以 ~/.local/bin 为例:

mkdir -p ~/.local/bin
mv mcap-linux-amd64 ~/.local/bin/mcap
chmod +x ~/.local/bin/mcap

如果 ~/.local/bin 不在 PATH 中,按当前 Shell 添加配置:

# bash
echo 'export PATH="$HOME/.local/bin:$PATH"
' >> ~/.bashrc
source ~/.bashrc

# zsh
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.zshrc
source ~/.zshrc

接下来安装 Python 包:

python -m pip install mcap mcap-ros2-support

说明:

  • mcap 提供 MCAP Reader。
  • mcap-ros2-support 用于解码 ROS2 Message。

检查 MCAP 是否有 Chunk Index

方式 1 - 使用 mcap info 命令

mcap info <你的文件.mcap>

判断标准:

  • 有 Chunk Index:输出中显示 compressionchunks 部分,包含 [X/Y chunks] 信息。
  • 无 Chunk Index:输出中没有 chunks 相关信息。

方式 2 - 使用 Python 脚本

将以下脚本保存为 check_mcap_index.py

import argparse

from mcap.reader import make_reader


def has_chunk_index(mcap_path: str) -> bool:
    with open(mcap_path, "rb") as f:
        reader = make_reader(f)
        summary = reader.get_summary()
        return bool(summary and summary.chunk_indexes)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("mcap")
    args = parser.parse_args()

    if has_chunk_index(args.mcap):
        print("OK: MCAP has chunk index")
    else:
        print("ERROR: MCAP has no chunk index")

执行:

python check_mcap_index.py input.mcap
  • 输出 OK 表示可以基于索引按 Topic 或时间范围流式加载。
  • 输出 ERROR 则应生成带索引的新 MCAP 文件。

生成带索引 MCAP

使用 MCAP CLI 生成新的带索引文件:

mcap compress input.mcap -o input.indexed.mcap \
  --chunk-size 4194304 \
  --compression zstd

参数说明:

  • input.mcap:原始 MCAP 文件。
  • input.indexed.mcap:输出文件。
  • -chunk-size 4194304:按 4 MB 左右重新分 Chunk。这里使用 MCAP CLI 默认值作为示例。
  • -compression zstd:输出 Chunk 使用 zstd 压缩。

如果从损坏的 MCAP 中恢复数据,可以使用 recover

mcap recover input.mcap -o input.indexed.mcap \
  --chunk-size 4194304 \
  --compression zstd \
  --always-decode-chunk

重写、压缩、重新分 Chunk 时,优先使用 compressrecover 的语义是从可能损坏的 MCAP 文件中恢复数据。


基于索引的范围流式加载示例

基于索引的范围流式加载适合读取指定 Topic 或指定时间窗口。为让 Reader 利用 Chunk Index 跳过无关 Chunk,应将 topicsstart_timeend_time 直接传给 reader.iter_decoded_messages()

from collections.abc import Iterable

from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory

TOPICS = [
    "/robot/topic/rdap/front_color_sensor_camera/h264",
    "/robot/topic/rdap/h264_front_left_sensor_camera",
    "/robot/topic/rdap/h264_front_right_sensor_camera",
    "/teleop/sync_state",
]


def stream_indexed_range(
    mcap_path: str,
    topics: Iterable[str] = TOPICS,
    start_time_ns: int | None = None,
    end_time_ns: int | None = None,
) -> None:
    with open(mcap_path, "rb") as f:
        reader = make_reader(f, decoder_factories=[DecoderFactory()])

        # 不传 topics 时,默认读取全部 Topic;
        # 不传 start_time 时,从匹配条件的最早消息开始读。
        # 不传 end_time 时,一直读到匹配条件的最后消息。
        for schema, channel, message, ros_msg in reader.iter_decoded_messages(
            topics=topics,
            start_time=start_time_ns,
            end_time=end_time_ns,
            log_time_order=True,
        ):
            handle_message(schema, channel, message, ros_msg)


def handle_message(schema, channel, message, ros_msg) -> None:
    print(channel.topic, message.log_time, flush=True)


# stream_indexed_range("input.indexed.mcap", TOPICS, start_time_ns=1780106598349765958, end_time_ns=1780106698349765958)

说明:

  • start_timeend_time 使用 MCAP message.log_time,单位是纳秒。
  • end_time 是排他边界,message.log_time >= end_time 的消息不被产出。
  • log_time_order=True 将按 log_time 排序产出 Message;如果只需要顺序扫描,可使用 False