高性能消息通信库, 类似 socket,但比 socket 高层 不需要 broker(不像 Kafka / RabbitMQ)支持:
- TCP
- IPC
- inproc(进程内)
- multicast
你可以理解成:
“Socket + 消息队列 + 网络模式封装”
它不是传统 MQ 服务,而是:
一个嵌入式通信库 链接进你的 C++ 程序即可
传统 socket 最大的问题:
recv/send 太底层
连接管理麻烦
重连麻烦
线程模型复杂
消息边界不存在
异步困难
ZeroMQ 帮你做了:
自动重连
消息边界
异步队列
负载均衡
发布订阅
请求响应
线程间通信
ZeroMQ 是一个高性能异步消息通信库。
与传统 Socket 编程不同,ZeroMQ 更强调:
通信模式(Messaging Pattern)`
而不是:
send / recv 细节
ZeroMQ 内置了多种通信模型,其中最核心的是:
REQ / REP(请求响应)
PUB / SUB(发布订阅)
PUSH / PULL(任务分发)
ROUTER / DEALER(高级异步路由)
| 模型 | 特点 | 是否同步 | 适合场景 |
|---|---|---|---|
| REQ/REP | 请求响应 | 同步 | RPC |
| PUB/SUB | 消息广播 | 异步 | 状态推送 |
| PUSH/PULL | 任务分发 | 异步 | 并行处理 |
| ROUTER/DEALER | 高级异步路由 | 异步 | 高并发系统 |
这是最基础、最容易理解的模式。
Client (REQ) ----request----> Server (REP)
Client (REQ) <---response---- Server (REP)REQ:
- Request
- 请求端
REP:
- Reply
- 响应端
REQ socket:
send
recv
send
recv
必须严格交替。
REP socket:
recv
send
recv
send
也必须严格交替。
这是它的重要特点。
客户端:
#include <zmq.hpp>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::req);
socket.connect("tcp://localhost:5555");
socket.send(
zmq::buffer("hello"),
zmq::send_flags::none);
zmq::message_t reply;
socket.recv(reply);
std::cout << reply.to_string()
<< std::endl;
}服务端:
#include <zmq.hpp>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::rep);
socket.bind("tcp://*:5555");
while (true)
{
zmq::message_t request;
socket.recv(request);
std::cout << "recv: "
<< request.to_string()
<< std::endl;
socket.send(
zmq::buffer("world"),
zmq::send_flags::none);
}
}REQ/REP 本质上是:
同步 RPC 模型
非常像:
- HTTP 请求
- RPC 调用
- REST API
简单最容易上手。
自动管理连接
不用:
- accept
- reconnect
有消息边界
不像 TCP 会粘包。
强同步
REQ:
必须 send 后 recv
不能连续发送。
否则:
Operation cannot be accomplished in current state
不适合高并发异步
因为它是:
阻塞式请求响应思维
适合:
- RPC
- 配置查询
- 控制指令
- SDK 请求接口
例如:
机器人控制器 -> 请求状态
服务器 -> 返回结果
这是 ZeroMQ 最经典的模式之一。
---> Subscriber1
Publisher
---> Subscriber2
---> Subscriber3
PUB:
- 发布者
SUB:
- 订阅者
本质:
广播模型
类似:
- ROS topic
- 消息总线
- 新闻广播
发布端:
zmq::socket_t pub(ctx, zmq::socket_type::pub);
pub.bind("tcp://*:5556");
while (true)
{
pub.send(zmq::buffer("temperature 25"));
}订阅端:
zmq::socket_t sub(ctx, zmq::socket_type::sub);
sub.connect("tcp://localhost:5556");
sub.set(zmq::sockopt::subscribe, "temperature");这是重点。
订阅:
subscribe = "temperature"
意味着:
只接收以 temperature 开头的消息
因此:
temperature 25
temperature 30
会收到。
而:
humidity 50
不会收到。
一对多广播
天然支持:
1 -> N
发布者不知道订阅者是谁
这是:
解耦
的核心。
高吞吐
因为:
- 无需等待回复
- 无需同步
订阅者可能丢消息
PUB/SUB 默认:
不保证可靠送达
因为它更偏:
实时流
而不是:
可靠事务
后加入的订阅者:
收不到历史消息
因为它不是 Kafka。
非常适合:
机器人状态广播
激光雷达数据
IMU数据
位置数据
日志系统
服务端 -> 多个日志客户端
行情系统
股票价格广播
PUSH / PULL 本质上是:
生产者 - 消费者模型
适用于:
- 任务队列
- 并行计算
- 图像处理
- AI 推理
- 视频处理流水线
Producer (PUSH)
|
+----> Worker1 (PULL)
+----> Worker2 (PULL)
+----> Worker3 (PULL)
PUSH Socket:
负责分发任务
PULL Socket:
负责接收任务
ZeroMQ 内部会自动进行:
负载均衡(Round Robin)
例如:
task1 -> worker1
task2 -> worker2
task3 -> worker3
生产者
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
zmq::socket_t push(context, zmq::socket_type::push);
push.bind("tcp://*:5557");
while (true)
{
push.send(
zmq::buffer("task"),
zmq::send_flags::none);
}
}消费者
#include <zmq.hpp>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t pull(context, zmq::socket_type::pull);
pull.connect("tcp://localhost:5557");
while (true)
{
zmq::message_t msg;
pull.recv(msg);
std::cout << msg.to_string()
<< std::endl;
}
}自动负载均衡
无需自己实现:
- 调度器
- 任务队列
- Worker 管理
易于扩容
新增 Worker:
直接 connect 即可
非常适合并行任务
例如:
- 图像识别
- 视频转码
- 数据处理
单向通信
PUSH/PULL:
不支持返回结果
因此不适合 RPC。
Worker 崩溃可能导致任务丢失
默认情况下:
不保证任务可靠性
ROUTER / DEALER 是 ZeroMQ 最核心、最强大的通信模式。
它解决了:
REQ/REP 同步阻塞
的问题。
适用于:
- 高并发服务器
- 异步 RPC
- 聊天系统
- 网关系统
- 分布式系统
Dealer 可以理解为:
异步版 REQ
与 REQ 不同:
Dealer 可以:
send();
send();
send();连续发送消息,而无需等待回复。
Router 可以理解为:
带路由能力的异步服务端
它能够识别:
每一个客户端
因为每个连接都具有:
identity
Dealer1 \
Dealer2 ---> Router
Dealer3 /
Router 可以知道:
消息来自谁
并且可以:
定向回复
Router 收到的消息:
[identity][message]
其中:
identity
表示客户端 ID。
因此:
Router 可以实现消息路由
完全异步
无需:
send -> recv
严格同步。
支持高并发
非常适合:
- Reactor 模型
- 高性能服务器
例如:
Frontend Router
↓
Backend Dealer
↓
Worker Pool
这已经非常接近:
现代微服务网关
的设计思想。
高性能 RPC
聊天服务器
消息网关
分布式任务系统