系列文章目录
文章目录
前言
- 如何创建和使用ZMQ sockets
- 通过socket send和 receive message
- 使用ZMQ的异步I/O模型
- 单线程处理多个socket
- 适当的处理致命和非致命错误
- 处理如Ctrl+C的中断信号
- 关闭并清理ZMQ应用
- 如何检测ZMQ应用的内存泄露
- 发送和接收 multipart messages
- 如何跨网络转发消息
- 如何构建一个简单的消息队列代理
- 使用ZeroMQ编写多线程应用程序
- 使用ZeroMQ在线程之间发送信号
- 使用ZeroMQ协调节点网络
- 为pub-sub创建和使用消息信封
- 使用HWM(high-water mark)防止内存溢出
● socket API
socket有四个部分的生命,向BSD sockets一样
- 创建和销毁套接字(zmq_socket(), zmq_close())
- 配置套接字( zmq_setsockopt, zmq_get_sockopt )
- 绑定与连接socket(zmq_bind, zmq_connect)
- 读写(zmq_msg_send, zmq_msg_recv)
pugging socket into the network topology
通过zmq_bind到一个节点,zmq_connect到另一个节点,可在两个节点间创建连接。bind方的地址是固定的,connect方可是任意地址。于是,将一个socket绑定到一个endpoint,将一个socket连接到一个endpoint,endpoint就是所谓网络地址。
- 一个socket可绑定多个endpoint
mq_bind (socket, "tcp://*:5555"); zmq_bind (socket, "tcp://*:9999"); zmq_bind (socket, "inproc://somename");
ZMQ尽量弱化bind和connect方的差异,但它们仍然不同。bind应该在网络中静态的、不变化的部分,而connect应该是动态的部分。
zmq sockets 和 TCPsockets的不同:
- ZMQ sockets携带messages, 更像UPD,而不像TCP的字节流。ZeroMQ消息是长度指定的二进制数据。
- ZMQ通过后台线程操作I/O。消息的到达和发出都是在局部队列中
- 根据套接字类型,ZeroMQ套接字内置了1-to-N路由行为
除了例外情况,zmq_send不阻塞,它将消息发送给缓冲队列,由IO线程异步发送数据。
单播传输
zmq提供一系列单播传输(inproc, ipc, tcp)和多播传输(epgm, pgm)。
zmq不是一个中性载体
zmq帧不是原始的tcp字节流,它有自己的封装,因此 zmq 服务端 不能和 用系统socket编写的客户端进行直接通讯。
zmq提供了SOCK_STREAM用于和TCP兼容
IO 线程
默认zmq会后台启动 ZMQbg/Reaper ZMQbg/IO/0两个线程
● 消息模式
zmq的模式由socket type对实现。为了理解ZMQ的模式,需要理解socket type以及它们如何在一起工作的。
ZMQ内建核心模式:
- 请求-响应:这是一种远程过程调用和任务分配模式。
- Pub-sub: 数据分发模式
- pipeline:将节点链接为扇出/扇形入模式。这是一种并行任务分发和收集模式。
- Exclusive pair:仅连接两个插座。这是一种在进程中连接两个线程的模式,不要与“正常”的套接字对混淆。
可链接配对的socket type
- PUB and SUB
- REQ and REP
- REQ and ROUTER (take care, REQ inserts an extra null frame)
- DEALER and REP (take care, REP assumes a null frame)
- DEALER and ROUTER
- DEALER and DEALER
- ROUTER and ROUTER
- PUSH and PULL
- PAIR and PAIR
https://libzmq.readthedocs.io/en/latest/zmq_socket.html
其他配对方式算式未定义行为。
以上四种模式为zmq的核心模式,由zmq的c++代码实现,其他模式是在此之上的扩展。
message收发
zmq_msg_t类型的消息收发
- Initialise a message: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data().
- Sending and receiving a message: zmq_msg_send(), zmq_msg_recv().
- Release a message: zmq_msg_close().
- Access message content: zmq_msg_data(), zmq_msg_size(), zmq_msg_more().
- Work with message properties: zmq_msg_get(), zmq_msg_set().
- Message manipulation: zmq_msg_copy(), zmq_msg_move().
当调用zmq_msg_send后,zmq会清除zmq_msg_t,将其设为0。引用-1
Frames(message parts)是zmq的基本消息类型。一个Frame是一个指定长度块的数据。
解决多个socket同时连接问题
zmq_socket可以链接到多个endpoint。
zmq_poll()可一次读取多个sockets。将其封装为事件驱动的形式是好方式。
非阻塞IO
while (1) {
int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
if (size != -1) { /* Process task */ }
else break;
}
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
// Process messages from both sockets
while (1) {
char msg [256];
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
int size = zmq_recv (receiver, msg, 255, 0);
if (size != -1) {
// Process task
}
}
if (items [1].revents & ZMQ_POLLIN) {
int size = zmq_recv (subscriber, msg, 255, 0);
if (size != -1) {
// Process weather update
}
}
}
typedef struct {
void *socket; // ZeroMQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
} zmq_pollitem_t;
多帧消息
send:
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, 0);
接收并处理所有多帧消息
while (1) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, socket, 0);
// Process the message frame
...
zmq_msg_close (&message);
if (!zmq_msg_more (&message))
break; // Last message frame
}
收发多帧消息
while (1) {
// Process all parts of the message
zmq_msg_init (&message);
zmq_msg_recv (&message, frontend, 0);
int more = zmq_msg_more (&message);
zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // Last message part
}
动态发现问题
设计大型分布式架构的一大难题是节点见如何相互发现对方。如果碎片来来去去,这尤其困难,所以我们称之为“动态发现问题”。
动态发现问题有几种解决方案。最简单的是直接写如何代码或做配置文件,这意味着,如果网络加入新的节点,需要重配网络。
如下图,subscriber是动态的,Publisher是静态的。subscriber可以动态的增添,当向网络中动态添加publisher时则会非常糟糕。
解决动态添加publisher问题最简单的方式是添加中间代理。它是一个静态的节点,动态连接pub和sub
我们需要XPUB和XSUB套接字,因为ZeroMQ可以从订阅者向发布者进行订阅转发。XSUB和XPUB与SUB和PUB完全相同,除了它们将订阅作为特殊消息公开。代理必须通过从XPUB套接字读取订阅消息并将其写入XSUB套接字,将这些订阅消息从订阅者端转发到发布者端。这是XSUB和XPUB的主要用例。
共享队列(DEALER and ROUTER)
当需要多个services和多个clients (多个线程或进程或节点)是,唯一的限制是服务必须是无状态的,所有的状态在请求或者共享存储如数据库中。
当网络中存在多个请求端和多个服务端时,想要灵活的增添请求端和服务端,消息队列代理是一个灵活的选择。当使用REQ-RER时,有严格的同步请求-应答对话:client 发送请求,service读取请求然后返回应答,client然后读取应答,如果不按这个套路,将会出错
当以如图方式搭建网络时,每个client会轮训访问service,每次请求都会轮训service。
router和dealer内会有队列,以轮训方式给每个service发送请求
built-in proxy
void *context = zmq_ctx_new ();
// Socket facing clients
void *frontend = zmq_socket (context, ZMQ_ROUTER);
int rc = zmq_bind (frontend, "tcp://*:5559");
assert (rc == 0);
// Socket facing services
void *backend = zmq_socket (context, ZMQ_DEALER);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
// Start the proxy loop
zmq_proxy (frontend, backend, NULL);
// We never get here...
zmq_close (frontend); zmq_close (backend);
zmq_ctx_destroy (context);
传输桥梁
ZMQ网络与其它网络通讯的方式
● 错误处理和
ZMQ错误处理的理念是:尽可能抵御外部错误和攻击,检测到内部错误时则终止程序执行。
商用代码应当对每一行ZMQ调用都进行错误处理。根据POSIX惯例,有一些简单的规则:
- 如果创建对象的方法失败了应当返回NULL
- 处理数据的方法获取会返回处理的字节数,如果发生错误或失败则返回-1
- 其他方法,0代表成功,-1代表失败
- 错误代码存储在errno或者zmq_errno()
- 描述错误的文本由zmq_strero()提供
有两个主要的例外情况,需作为非致命错误处理:
- 对于非阻塞读写,当返回-1并且errno == EAGAIN
- 当一个线程调用zmq_ctxdestroy(), 但其它线程仍处于阻塞状态时,zmq_ctx_destroy()调用关闭上下文对象,所有阻塞调用返回-1并且errno==ETERM
启用编译器优化或-DNDEBUG会将assert中的statement整个移除,所以不要将ZMQ调用放在assert中。
work:
int main ()
{
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Socket for control input
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
// Process messages from either socket
while (1) {
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
char *string = s_recv (receiver);
printf ("%s.", string); // Show progress
fflush (stdout);
s_sleep (atoi (string)); // Do the work
free (string);
s_send (sender, ""); // Send results to sink
}
// Any waiting controller command acts as 'KILL'
if (items [1].revents & ZMQ_POLLIN)
break; // Exit loop
}
zmq_close (receiver); zmq_close (sender); zmq_close (controller);
zmq_ctx_destroy (context);
return 0;
}
● 中断信息号处理
// Shows how to handle Ctrl-C
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include "zmq.h"
// Signal handling
//
// Create a self-pipe and call s_catch_signals(pipe's writefd) in your application
// at startup, and then exit your main loop if your pipe contains any data.
// Works especially well with zmq_poll.
#define S_NOTIFY_MSG " "
#define S_ERROR_MSG "Error while writing to self-pipe.\n"
static int s_fd;
static void s_signal_handler (int signal_value)
{
int rc = write (s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG));
if (rc != sizeof(S_NOTIFY_MSG)) {
write (STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG)-1);
exit(1);
}
}
static void s_catch_signals (int fd)
{
s_fd = fd;
struct sigaction action;
action.sa_handler = s_signal_handler;
// Doesn't matter if SA_RESTART set because self-pipe will wake up zmq_poll
// But setting to 0 will allow zmq_read to be interrupted.
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
int main (void)
{
int rc;
void *context = zmq_ctx_new ();
void *socket = zmq_socket (context, ZMQ_REP);
zmq_bind (socket, "tcp://*:5555");
int pipefds[2];
rc = pipe(pipefds);
if (rc != 0) {
perror("Creating self-pipe");
exit(1);
}
for (int i = 0; i < 2; i++) {
int flags = fcntl(pipefds[i], F_GETFL, 0);
if (flags < 0) {
perror ("fcntl(F_GETFL)");
exit(1);
}
rc = fcntl (pipefds[i], F_SETFL, flags | O_NONBLOCK);
if (rc != 0) {
perror ("fcntl(F_SETFL)");
exit(1);
}
}
s_catch_signals (pipefds[1]);
zmq_pollitem_t items [] = {
{ 0, pipefds[0], ZMQ_POLLIN, 0 },
{ socket, 0, ZMQ_POLLIN, 0 }
};
while (1) {
rc = zmq_poll (items, 2, -1);
if (rc == 0) {
continue;
} else if (rc < 0) {
if (errno == EINTR) { continue; }
perror("zmq_poll");
exit(1);
}
// Signal pipe FD
if (items [0].revents & ZMQ_POLLIN) {
char buffer [1];
read (pipefds[0], buffer, 1); // clear notifying byte
printf ("W: interrupt received, killing server...\n");
break;
}
// Read socket
if (items [1].revents & ZMQ_POLLIN) {
char buffer [255];
// Use non-blocking so we can continue to check self-pipe via zmq_poll
rc = zmq_recv (socket, buffer, 255, ZMQ_DONTWAIT);
if (rc < 0) {
if (errno == EAGAIN) { continue; }
if (errno == EINTR) { continue; }
perror("recv");
exit(1);
}
printf ("W: recv\n");
// Now send message back.
// ...
}
}
printf ("W: cleaning up\n");
zmq_close (socket);
zmq_ctx_destroy (context);
return 0;
}
● 检测内存泄露
对于一直运行不结束的程序,可通过添加信号处理功能,通过 Ctrl+C 使程序正常退出,valgrind才能正常使用。
对于ZMQ
vg.supp
{
<socketcall_sendto>
Memcheck:Param
socketcall.sendto(msg)
fun:send
...
}
{
<socketcall_sendto>
Memcheck:Param
socketcall.send(msg)
fun:send
...
}
valgrind --tool=memcheck --leak-check=full --suppressions=vg.supp someprog
● 多线程
多线程程序应该从程序设计或者从结构上避免共享状态,就应该每个线程之间不共享状态,互不干扰。
- 在多线程见隔离数据,不要共享数据。例外:ZMQ的context是线程安全的。
- 进程一开始创建ZMQ context,然后将它传递给所有你想要通讯的线程
- Use attached threads to create structure within your application, and connect these to their parent threads using PAIR sockets over inproc. The pattern is: bind parent socket, then create child thread which connects its socket.
- Use detached threads to simulate independent tasks, with their own contexts. Connect these over tcp. Later you can move these to stand-alone processes without changing the code significantly.
- 不要在线程间共享ZMQ socket,它不是线程安全的
- 谁create,谁close
漂亮的多线程应用应该也可以轻松的改为多进程的应用
应用程序逻辑可以位于线程、进程或节点中:无论您的规模需要什么。
// Multithreaded Hello World server
static void *worker_routine (void *context) {
// Socket to talk to dispatcher
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
char *string = s_recv (receiver);
printf ("Received request: [%s]\n", string);
free (string);
// Do some 'work'
sleep (1);
// Send reply back to client
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}
int main ()
{
void *context = zmq_ctx_new ();
// Socket to talk to clients
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
// Socket to talk to workers
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// Launch pool of worker threads
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Connect work threads to client threads via a queue proxy
zmq_proxy (clients, workers, NULL);
// We never get here, but clean up anyhow
zmq_close (clients); zmq_close (workers);
zmq_ctx_destroy (context);
return 0;
}
● 线程间的信号(ZMQ_PAIR)
- 您可以对发送方使用PUSH,对接收方使用PULL。这看起来很简单,也会奏效,但请记住,PUSH会将消息分发给所有可用的接收者。如果你意外启动了两个接收器(例如,你已经有一个正在运行,而你启动了第二个),你将“失去”一半的信号。PAIR的优点是拒绝多个连接;这对是独家的。
- 您可以使用DEALER作为发件人,ROUTER作为收件人。然而,ROUTER将您的消息包装在一个“信封”中,这意味着您的零大小信号会变成一个多部分消息。如果你不关心数据,把任何东西都当作有效信号,如果你不从套接字读取多次,那就没关系了。然而,如果您决定发送真实数据,您会突然发现ROUTER为您提供了“错误”的消息。交易商还分发传出消息,与推送风险相同。
- 您可以将PUB用于发送方,将SUB用于接收方。这将正确地传递您发送的消息,并且PUB不会像PUSH或DEALER那样分发。但是,您需要为订阅者配置空订阅,这很烦人。
由于这些原因,PAIR是协调成对线程的最佳选择。类型为“ZMQ_PAIR”的套接字一次只能连接到单个对等端。对通过“ZMQ_PAIR”套接字发送的消息不执行消息路由或过滤。
● 多节点同步
- 启动所有节点
- pub先发送确认消息
- subscriber都到确认消息后发送request
- 当publisher受到所有request后在发送真正的数据
- publisher发送完数据有发送end,subscriber收到end后执行相应操作
总结
请看文中的图