Home >php教程 >PHP源码 >ZeroMQ,史上最快的消息队列 —– ZMQ的学习和研究

ZeroMQ,史上最快的消息队列 —– ZMQ的学习和研究

PHP中文网
PHP中文网Original
2016-05-25 17:07:032106browse

ZeroMQ,史上最快的消息队列  —– ZMQ的学习和研究

一、ZeroMQ 的背景介绍

  引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”

  近几年有关”Message Queue”的项目层出不穷,知名的就有十几种,这主要是因为后摩尔定律时代,分布式处理逐渐成为主流,业界需要一套标准来解决分布式计算环境中节点之间的消息通信。几年的竞争下来,Apache 基金会旗下的符合 AMQP/1.0标准的 RabbitMQ 已经得到了广泛的认可,成为领先的 MQ 项目。

  与 RabbitMQ 相比,ZMQ 并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口。

  二、ZMQ 是什么?

  阅读了 ZMQ 的 Guide 文档后,我的理解是,这是个类似于 Socket 的一系列接口,他跟 Socket 的区别是:普通的 socket 是端到端的(1:1的关系),而 ZMQ 却是可以N:M 的关系,人们对 BSD 套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而 ZMQ 屏蔽了这些细节,让你的网络编程更为简单。ZMQ 用于 node 与 node 间的通信,node 可以是主机或者是进程。

  三、本文的目的

  在集群对外提供服务的过程中,我们有很多的配置,需要根据需要随时更新,那么这个信息如果推动到各个节点?并且保证信息的一致性和可靠性?本文在介绍 ZMQ 基本理论的基础上,试图使用 ZMQ 实现一个配置分发中心。从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。

  四、ZMQ 的三个基本模型

  ZMQ 提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”,我们从这三种模式一窥 ZMQ 的究竟

  ZMQ 的 hello world!

  由 Client 发起请求,并等待 Server 回应请求。请求端发送一个简单的 hello,服务端则回应一个 world。请求端和服务端都可以是 1:N 的模型。通常把 1 认为是 Server ,N 认为是 Client 。ZMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为N:M (只需要加入若干路由节点)。如图 1 所示:

28211728_zxED.jpg


  图1:ZMQ 的 Request-Reply 通信

  服务端的 php 程序如下:

<?php /* * Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ $context = new ZMQContext (1); // Socket to talk to clients $responder = new ZMQSocket ($context, ZMQ::SOCKET_REP); $responder-&gt;bind ("tcp://*:5555"); while(true) { // Wait for next request from client $request = $responder-&gt;recv (); printf ("Received request: [%s]\n", $request); // Do some &#39;work&#39; sleep (1); // Send reply back to client $responder-&gt;send ("World");
}

  Client 程序如下:

<?php /* *  Hello World client
 
 *  Connects REQ socket to tcp://localhost:5555
 
 *  Sends "Hello" to server, expects "World" back
 
 * @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ $context = new ZMQContext (); // Socket to talk to server  echo "Connecting to hello world server...\n"; $requester = new ZMQSocket ($context, ZMQ::SOCKET_REQ); $requester-&gt;connect ("tcp://localhost:5555"); for($request_nbr = 0; $request_nbr != 10; $request_nbr++) { printf ("Sending request %d...\n", $request_nbr); $requester-&gt;send ("Hello"); $reply = $requester-&gt;recv (); printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
 
}

从以上的过程,我们可以了解到使用 ZMQ 写基本的程序的方法,需要注意的是:

  a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。

  b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

  c) 服务端收到信息以后,会 send 一个“World”给客户端。值得注意的是一定是 client 连接上来以后,send 消息给 Server,然后 Server 再 rev 然后响应 client,这种一问一答式的。如果 Server 先 send,client 先 rev 是会报错的。

  d) ZMQ 通信通信单元是消息,他除了知道 Bytes 的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json 等等。

  e) 虽然可以使用 ZMQ 实现 HTTP 协议,但是,这绝不是他所擅长的。

  ZMQ 的 Publish-subscribe 模式

  我们可以想象一下天气预报的订阅模式,由一个节点提供信息源,由其他的节点,接受信息源的信息,如图 2 所示:

  图2:ZMQ 的 Publish-subscribe

  示例代码如下 :

  Publisher:

28211728_6Dzt.jpg

<?php /* * Weather update server
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ // Prepare our context and publisher $context = new ZMQContext (); $publisher = $context-&gt;getSocket (ZMQ::SOCKET_PUB); $publisher-&gt;bind ("tcp://*:5556"); while (true) { // Get values that will fool the boss $zipcode = mt_rand(0, 100000); $temperature = mt_rand(-80, 135); $relhumidity = mt_rand(10, 60); // Send message to all subscribers $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity); $publisher-&gt;send ($update);
}
Subscriber
&lt;?php /* * Weather update client
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds avg temp in zipcode
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ $context = new ZMQContext (); // Socket to talk to server echo "Collecting updates from weather server…", PHP_EOL; $subscriber = new ZMQSocket ($context, ZMQ::SOCKET_SUB); $subscriber-&gt;connect ("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 $filter = $_SERVER[&#39;argc&#39;] &gt; 1 ? $_SERVER[&#39;argv&#39;][1] : "10001"; $subscriber-&gt;setSockOpt (ZMQ::SOCKOPT_SUBSCRIBE, $filter); // Process 100 updates $total_temp = 0; for ($update_nbr = 0; $update_nbr &lt; 100; $update_nbr++) { $string = $subscriber-&gt;recv (); sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity); $total_temp += $temperature;
} printf ("Average temperature for zipcode &#39;%s&#39; was %dF\n", $filter, (int) ($total_temp

这段代码讲的是,服务器端生成随机数 zipcode、temperature、relhumidity 分别代表城市代码、温度值和湿度值。然后不断的广播信息,而客户端通过设置过滤参数,接受特定城市代码的信息,收集完了以后,做一个平均值。

  a) 与 Hello World 不同的是,Socket 的类型变成 SOCKET_PUB 和 SOCKET_SUB 类型。

  b) 客户端需要$subscriber->setSockOpt (ZMQ::SOCKOPT_SUBSCRIBE, $filter);设置一个过滤值,相当于设定一个订阅频道,否则什么信息也收不到。

  c) 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。稍后,会解决这个问题。

  d) 但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。

  ZMQ 的 PipeLine 模型

  想象一下这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine 比较适合于这种场景,他的结构图,如图 3 所示。

28211729_dg34.jpg

 图3:ZMQ 的 PipeLine 模型

  Parallel task ventilator in PHP

<?php /* * Task ventilator
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ $context = new ZMQContext (); // Socket to send messages on $sender = new ZMQSocket ($context, ZMQ::SOCKET_PUSH); $sender-&gt;bind ("tcp://*:5557"); echo "Press Enter when the workers are ready: "; $fp = fopen(&#39;php://stdin&#39;, &#39;r&#39;); $line = fgets($fp, 512); fclose($fp); echo "Sending tasks to workers…", PHP_EOL; // The first message is "0" and signals start of batch $sender-&gt;send (0); // Send 100 tasks $total_msec = 0; // Total expected cost in msecs for ($task_nbr = 0; $task_nbr &lt; 100; $task_nbr++) { // Random workload from 1 to 100msecs $workload = mt_rand(1, 100); $total_msec += $workload; $sender-&gt;send ($workload);
 
} printf ("Total expected cost: %d msec\n", $total_msec); sleep (1); // Give 0MQ time to deliver

Parallel task worker in PHP

<?php /* * Task worker
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ $context = new ZMQContext (); // Socket to receive messages on $receiver = new ZMQSocket ($context, ZMQ::SOCKET_PULL); $receiver-&gt;connect ("tcp://localhost:5557"); // Socket to send messages to $sender = new ZMQSocket ($context, ZMQ::SOCKET_PUSH); $sender-&gt;connect ("tcp://localhost:5558"); // Process tasks forever while (true) { $string = $receiver-&gt;recv (); // Simple progress indicator for the viewer echo $string, PHP_EOL; // Do the work usleep($string * 1000); // Send results to sink $sender-&gt;send ("");
}

Parallel task sink in PHP

<?php /* * Task sink
* Binds PULL socket to tcp://localhost:5558
* Collects results from workers via that socket
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ // Prepare our context and socket $context = new ZMQContext (); $receiver = new ZMQSocket ($context, ZMQ::SOCKET_PULL); $receiver-&gt;bind ("tcp://*:5558"); // Wait for start of batch $string = $receiver-&gt;recv (); // Start our clock now $tstart = microtime(true); // Process 100 confirmations $total_msec = 0; // Total calculated cost in msecs for ($task_nbr = 0; $task_nbr &lt; 100; $task_nbr++) { $string = $receiver-&gt;recv (); if($task_nbr % 10 == 0) { echo ":";
} else { echo ".";
}
} $tend = microtime(true); $total_msec = ($tend - $tstart) * 1000; echo PHP_EOL; printf ("Total elapsed time: %d msec", $total_msec); echo PHP_EOL;


  从程序中,我们可以看到,task ventilator 使用的是 SOCKET_PUSH,将任务分发到 Worker 节点上。而 Worker 节点上,使用 SOCKET_PULL 从上游接受任务,并使用 SOCKET_PUSH 将结果汇集到 Slink。值得注意的是,任务的分发的时候也同样有一个负载均衡的路由功能,worker 可以随时自由加入,task ventilator 可以均衡将任务分发出去。

  五、其他扩展模式

  通常,一个节点,即可以作为 Server,同时也能作为 Client,通过 PipeLine 模型中的 Worker,他向上连接着任务分发,向下连接着结果搜集的 Sink 机器。因此,我们可以借助这种特性,丰富的扩展原有的三种模式。例如,一个代理 Publisher,作为一个内网的 Subscriber 接受信息,同时将信息,转发到外网,其结构图如图 4 所示。

28211729_47K3.jpg

  图4:ZMQ 的扩展模式

  六、多个服务器

  ZMQ 和 Socket 的区别在于,前者支持N:M的连接,而后者则只是1:1的连接,那么一个 Client 连接多个 Server 的情况是怎样的呢,我们通过图 5 来说明。

28211729_H40E.jpg

  图5:ZMQ 的N:1的连接情况

  我们假设 Client 有 R1,R2,R3,R4四个任务,我们只需要一个 ZMQ 的 Socket,就可以连接四个服务,他能够自动均衡的分配任务。如图 5 所示,R1,R4自动分配到了节点A,R2到了B,R3到了C。如果我们是N:M的情况呢?这个扩展起来,也不难,如图 6 所示。

28211729_LqAi.jpg

  图6:N:M的连接

  我们通过一个中间结点(Broker)来进行负载均衡的功能。我们通过代码了解,其中的 Client 和我们的 Hello World 的 Client 端是一样的,而 Server 端的不同是,他不需要监听端口,而是需要连接 Broker 的端口,接受需要处理的信息。所以,我们重点阅读 Broker 的代码:

<?php /* * Simple request-reply broker
* @author Ian Barber &lt;ian (dot) barber (at) gmail (dot) com&gt; */ // Prepare our context and sockets $context = new ZMQContext (); $frontend = new ZMQSocket ($context, ZMQ::SOCKET_ROUTER); $backend = new ZMQSocket ($context, ZMQ::SOCKET_DEALER); $frontend-&gt;bind ("tcp://*:5559"); $backend-&gt;bind ("tcp://*:5560"); // Initialize poll set $poll = new ZMQPoll (); $poll-&gt;add ($frontend, ZMQ::POLL_IN); $poll-&gt;add ($backend, ZMQ::POLL_IN); $readable = $writeable = array(); // Switch messages between sockets while(true) { $events = $poll-&gt;poll ($readable, $writeable); foreach($readable as $socket) { if($socket === $frontend) { // Process all parts of the message while(true) { $message = $socket-&gt;recv (); // Multipart detection $more = $socket-&gt;getSockOpt (ZMQ::SOCKOPT_RCVMORE); $backend-&gt;send ($message, $more ? ZMQ::MODE_SNDMORE : null); if(!$more) { break; // Last message part }
}
} else if($socket === $backend) { $message = $socket-&gt;recv (); // Multipart detection $more = $socket-&gt;getSockOpt (ZMQ::SOCKOPT_RCVMORE); $frontend-&gt;send ($message, $more ? ZMQ::MODE_SNDMORE : null); if(!$more) { break; // Last message part }
}
}
}


  Broker 监听了两个端口,接受从多个 Client 端发送过来的数据,并将数据,转发给 Server。在 Broker 中,我们监听了两个端口,使用了两个 Socket,那么对于多个 Socket 的情况,我们是不需要通过轮询的方式去处理数据的,在之前,我们可以使用 libevent 实现,异步的信息处理和传输。而现在,我们只需要使用 ZMQ 的$poll->poll 以实现多个 Socket 的异步处理。

  七、进程间的通信

  ZMQ 不仅能通过 TCP 完成节点间的通信,也可以通过 Socket 文件完成进程间的通信。如图 7 所示,我们 fork 三个 PHP 进程,将进程 1 的数据,通过 Socket 文件发送到进程3。

28211729_ifpw.jpg

图7:进程间的通信

<?php function step1() { $context = new ZMQContext (); // Signal downstream to step 2  $sender = new ZMQSocket ($context, ZMQ::SOCKET_PAIR); $sender-&gt;connect ("ipc://step2.ipc"); $sender-&gt;send ("hello ,i am step1");
 
} function step2() { $pid = pcntl_fork (); if($pid == 0) {
 
                step1(); exit();
 
        } $context = new ZMQContext (); // Bind to ipc: endpoint, then start upstream thread  $receiver = new ZMQSocket ($context, ZMQ::SOCKET_PAIR); $receiver-&gt;bind ("ipc://step2.ipc"); // Wait for signal   sleep(10); $strings = $receiver-&gt;recv (); echo "step2 receiver is $strings". PHP_EOL; sleep(10); // Signal downstream to step 3  $sender = new ZMQSocket ($context, ZMQ::SOCKET_PAIR); $sender-&gt;connect ("ipc://step3.ipc"); $sender-&gt;send ($strings);
 
} // Start upstream thread then bind to icp: endpoint  $pid = pcntl_fork (); if($pid == 0) {
 
        step2(); exit();
 
} $context = new ZMQContext (); $receiver = new ZMQSocket ($context, ZMQ::SOCKET_PAIR); $receiver-&gt;bind ("ipc://step3.ipc"); // Wait for signal  $sr = $receiver-&gt;recv (); echo "the result is {$sr}".PHP_EOL;

 在运行中,我们可以看到多了两个文件,如图 8 所示。

28211729_k4Kh.jpg

  图8:运行过程中生成的文件

  八、利用 ZeroMQ 实现一个配置推送中心

  当我们将 WEB 代码部署到集群上的时候,如果需要实时的将最新的配置信息,主动的推送到各个机器节点。在此过程中,我们一定要保证,各个节点收到的信息的一致性和正确性,如果使用 HTTP,由于他的无状态性,我们无法保证信息的一致性,当然,你可以使用 HTTP 来实现,只是更复杂,为什么不用 ZMQ?他能让你更简单的实现这些功能。

  我们使用 ZMQ 的信息订阅模式。在那个模式中,我们注意到,对于后来的加入节点,始终会丢失在他加入之前,已经发送的信息(Slow joiner)。我们可以开启另外一个 ZMQ 的通信通道,用于报告当前节点的情况(节点的身份、准备状态等),其结构如图 9 所示。

28211729_NBfC.jpg

  图9:扩展 ZMQ 的订阅者模式

  我们通过$context->getSocket (ZMQ::SOCKET_REQ);设置一个新的 Request-Reply 连接,来用于 Subscriber 向 Publisher 报告自己的身份信息,而 Publisher 则等待所有的 Subscriber 都连接上的时候,再选择 Publish 自己的信息。

  Subscriber 端的程序如下:

<?php $hostname = $_SERVER[&#39;argc&#39;] &gt; 1 ? $_SERVER[&#39;argv&#39;][1] : "s1"; $context = new ZMQContext (2); $sub = new ZMQSocket ($context,ZMQ::SOCKET_SUB); $sub-&gt;connect ("tcp://localhost:5561"); //$subscriber-&gt;setSockOpt (ZMQ::SOCKOPT_IDENTITY, $hostname);  $sub-&gt;setSockOpt (ZMQ::SOCKOPT_SUBSCRIBE,""); $client = $context-&gt;getSocket (ZMQ::SOCKET_REQ); $client-&gt;connect ("tcp://localhost:5562"); while(1) { //$client-&gt;connect ("tcp://localhost:5562");  $client-&gt;send ($hostname); $version = $client-&gt;recv (); echo $version."\r\n"; if (!empty($version)) { $recive = $sub-&gt;recv (); $vars = json_decode ($recive); var_dump($vars);
 
}
 
}


  Publisher 端的程序如下:

<?php $CONFIG["TAOKE_BTS"]["ENABLE"] = true; $CONFIG["QP_BTS"]["ENABLE"] = true; $CONFIG["QP_BTS"]["TK_TEST"] = 13; $string = json_encode ($CONFIG); $clients = array("s2","s1","s3"); $context = new ZMQContext (10); //Socket talk to clients  $publisher = new ZMQSocket ($context,ZMQ::SOCKET_PUB); $publisher-&gt;bind ("tcp://*:5561"); //Socket to publish message  $server = new ZMQSocket ($context,ZMQ::SOCKET_REP); $server-&gt;bind ("tcp://*:5562"); while(count($clients)!=0) { $client_name = $server-&gt;recv (); echo "{$client_name} is connect!\r\n"; if (in_array($client_name, $clients)) { //coming one client  $key = array_search($client_name, $clients); unset($clients[$key]); echo "$client_name has come in!\r\n"; $server-&gt;send ("Version is 2.0");
 
} else { $server-&gt;send ("You are a stranger!");
 
}
 
} $publisher-&gt;send ($string);
 
?&gt;


  每个节点通过 5562 端口,使用 Rep 模式和 Publisher 连接,通过这个连接告之 Publisher 自己的机器名,而 Publisher 端通过白名单的方式,维护一个机器列表,当机器列表中所有的机器连接上来以后,通过 5561 端口,将最新的配置信息发送出去。

  后续的处理,Subscriber 可以选择将配置信息写入到 APC 缓存,程序将始终从缓存中读取部分配置信息,Subscriber 并将更新后的状态信息,实时的通过 5562 报告给 Publisher。

  虽然,在本示例中不会出现,但是,如果需要发布的信息量过大,在接受信息的过程中,Subscriber 端突然中断网络(或者是程序崩溃),那么当他在连接上来的时候,有部分信息就会丢失?ZMQ 考虑到这个问题,通过$subscriber->setSockOpt (ZMQ::SOCKOPT_IDENTITY, $hostname);设置一个 id,当这个 id 的 Subscriber 重新连接上来的时候,他可以从上次中断的地方,继续接受信息,当然,节点的中断,不会影响其他的节点继续的接受信息。

  那么 ZMQ 是怎么实现断线重连后,继续发送信息呢 ?他会将断开的 Subscriber 应该接受到的信息发到内存中,等待他重新上线后,将缓存的信息,继续发送给他。当然,内存必然是有限的,过多就会出现内存溢出。ZMQ 通过

  SetSockOpt (ZMQ::SOCKOPT_SWAP, 250000)设置 Swap 空间的大小,来防止 out of memory and crash。最终,我们的程序运行结果,如图 10 所示。

28211729_Geww.jpg

  图 10:配置中心的运行结果

  当然,这只是一个大体的思路,如果应用到实际的成产环境中,还需要考虑更多的问题,包含稳定性,容错等等。然而,ZMQ 由于高并发,以及稳定性和易用性,前景不错,他的目标是进入 Linux 内核,我们期待那一天的到来。


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn