需求 当一个用户给多个好友发送邀请邮件时,当一个请求需要从很多个数据库中读取数据时,当一个页面需要大量计算又想快速响应时,我们都希望php能够做到异步执行, 即并发地发邮件,并行地从多个数据库取数据,并行的计算业务逻辑,从而能够快速的响应用户,
当一个用户给多个好友发送邀请邮件时,当一个请求需要从很多个数据库中读取数据时,当一个页面需要大量计算又想快速响应时,我们都希望php能够做到异步执行,
即并发地发邮件,并行地从多个数据库取数据,并行的计算业务逻辑,从而能够快速的响应用户,不必让用户苦等。
仔细想下群发邮件、并行的从数据库中取数据、并行计算业务逻辑这些需求又都有不同,所以这里php异步并行需要区别对待。
1、群发邮件。此类需求一般不需要等所有邮件都发送完毕才给用户返回,所以是一种需要处理但不用立刻返回的需求
2、并行取数据。此类需求和群发邮件不一样,用户需要这些数据都从存储中取到了才能返回,是一种IO密集型的需求
3、并行计算。此类需求又与前两者不一样,因为不止是并行取数据和等待结果,重要的是大量的并行计算,是一种CPU密集型需求
解决这个问题网上有很多的解决方案
2、将此类任务存储在数据库中,然后cron脚本定时从数据库中读取任务进行处理。
靠谱的方案是使用消息中间件,例如RabbitMQ、ZeroMQ等。即采用类似消息队列的机制将这类耗时的任务存储起来,排队一个个处理。使用这种方案可以做到较高的并发量,稳定性也有保证。
当一个请求需要从很多数据库中获取数据时,在php中一般的做法是一个一个串行的从数据库中读。从而大量的时间都耗费在网络IO上,使响应时间变长。如何才能并行的或者说并发的从数据库中读取数据呢?
如果你用的数据库是mysql,那么不妨试下mysqli 扩展的异步方法mysqli_poll和mysqli_reap_async_query。
下面是一个例子:
<?php $link1 = mysqli_connect(); $link1->query("SELECT 'test'", MYSQLI_ASYNC); $all_links = array($link1); $processed = 0; do { $links = $errors = $reject = array(); foreach ($all_links as $link) { $links[] = $errors[] = $reject[] = $link; } if (!mysqli_poll($links, $errors, $reject, 1)) { continue; } foreach ($links as $link) { if ($result = $link->reap_async_query()) { print_r($result->fetch_row()); if (is_object($result)) mysqli_free_result($result); } else die(sprintf("MySQLi Error: %s", mysqli_error($link))); $processed++; } } while ($processed <br> 通过mysqli的异步方法,可以做到并行的从数据库中获取数据,从而达到减少用户等待的目的。 <h2>3、如何做到并行计算</h2> <p>做到异步并行计算(类似MapReduce)有点复杂,这需要使用到php的socket通信及进程控制等技巧。</p> <h3>原理<br> </h3> <p>它的原理就是把多个CPU密集型的任务分配给其它进程(其它的cpu或者其它的服务器)上去分别并行的处理(一般是通过socket派发任务),然后再把多个结果汇总起来,给用户展示。</p> <h3>并行计算示意图<br> </h3> <p><img src="/inc/test.jsp?url=http%3A%2F%2Fimg.blog.csdn.net%2F20140208175006078%3Fwatermark%2F2%2Ftext%2FaHR0cDovL2Jsb2cuY3Nkbi5uZXQvdWRlZmluZWQ%3D%2Ffont%2F5a6L5L2T%2Ffontsize%2F400%2Ffill%2FI0JBQkFCMA%3D%3D%2Fdissolve%2F70%2Fgravity%2FSouthEast&refer=http%3A%2F%2Fblog.csdn.net%2Fudefined%2Farticle%2Fdetails%2F18988755" alt="如何实现PHP异步调用或者说并行计算" ><br> </p> <p><br> </p> <h3>如何做</h3> <p>1、首先要有多个可供远程调用的socket服务,一般我们叫它RpcServer。它有三个主要任务即接收请求、处理请求、发送结果,其实就是类似我们普通的WebServer。调用者通过一定的url规则和GET/POST参数告知RpcServer要计算处理的任务,RpcServer处理完毕之后,再将结果返回给调用者。</p> <p>2、调用者并发的将任务请求RpcServer。如果你的RpcServer使用的是HTTP协议,那么这步就比较好实现。<code></code>可以使用php的curl_multi_*函数</p> <p>3、轮询每个RpcServer数据是否返回。同样使用HTTP协议的话,使用php的curl_multi_*函数可以做到。</p> <h3>使用HTTP协议调用端demo</h3> <pre class="brush:php;toolbar:false"><?php // 创建一对cURL资源 $ch1 = curl_init(); $ch2 = curl_init(); // 设置URL和相应的选项,并发的让RpcServer处理两个业务逻辑getUserInfo和getBlogList curl_setopt($ch1, CURLOPT_URL, "http://YourRpcServer.com/getUserInfo/55757"); curl_setopt($ch1, CURLOPT_HEADER, 0); curl_setopt($ch2, CURLOPT_URL, "http://YourRpcServer.com/getBlogList/55757"); curl_setopt($ch2, CURLOPT_HEADER, 0); // 创建批处理cURL句柄 $mh = curl_multi_init(); // 增加2个句柄 curl_multi_add_handle($mh,$ch1); curl_multi_add_handle($mh,$ch2); $active = null; // 执行批处理句柄 do { $mrc = curl_multi_exec($mh, $active); } while ($mrc == CURLM_CALL_MULTI_PERFORM); // 轮询RpcServer返回的结果 while ($active && $mrc == CURLM_OK) { if (curl_multi_select($mh) != -1) { do { $mrc = curl_multi_exec($mh, $active); } while ($mrc == CURLM_CALL_MULTI_PERFORM); } } // 汇总结果...... // 关闭全部句柄 curl_multi_remove_handle($mh, $ch1); curl_multi_remove_handle($mh, $ch2); curl_multi_close($mh);
这里推荐一个现成的RPC框架 workerman-jsonrpc,非常好用,强烈推荐。
使用方法如下:
<code><span><span><?php <br></span><span>class </span><span>User<br></span><span>{<br> public static function </span><span>getInfoByUid</span><span>(</span><span>$uid</span><span>)<br> {<br> </span><span>// ....<br> </span><span>}<br><br> public static function </span><span>getEmail</span><span>(</span><span>$uid</span><span>)<br> {<br> </span><span>// ...<br> </span><span>}<br>}<br></span></span></code>
./bin/workermand start
<code><span><span><?php <br></span><span>include_once </span><span>'yourClientDir/RpcClient.php'</span><span>;<br><br></span><span>$address_array </span><span>= array(<br> </span><span>'tcp://127.0.0.1:2015'</span><span>,<br> </span><span>'tcp://127.0.0.1:2015'<br> </span><span>);<br></span><span>// 配置服务端列表<br></span><span>RpcClient</span><span>::</span><span>config</span><span>(</span><span>$address_array</span><span>);<br><br></span><span>$uid </span><span>= </span><span>567</span><span>;<br><br></span><span>// User对应applications/JsonRpc/Services/User.php 中的User类<br></span><span>$user_client </span><span>= </span><span>RpcClient</span><span>::</span><span>instance</span><span>(</span><span>'User'</span><span>);<br><br></span><span>// getInfoByUid对应User类中的getInfoByUid方法<br></span><span>$ret_sync </span><span>= </span><span>$user_client</span><span>-></span><span>getInfoByUid</span><span>(</span><span>$uid</span><span>);<br></span></span></code>
<code><span><span><?php <br></span><span>include_once </span><span>'yourClientDir/RpcClient.php'</span><span>;<br></span><span>// 服务端列表<br></span><span>$address_array </span><span>= array(<br> </span><span>'tcp://127.0.0.1:2015'</span><span>,<br> </span><span>'tcp://127.0.0.1:2015'<br> </span><span>);<br></span><span>// 配置服务端列表<br></span><span>RpcClient</span><span>::</span><span>config</span><span>(</span><span>$address_array</span><span>);<br><br></span><span>$uid </span><span>= </span><span>567</span><span>;<br></span><span>$user_client </span><span>= </span><span>RpcClient</span><span>::</span><span>instance</span><span>(</span><span>'User'</span><span>);<br><br></span><span>// 异步调用User::getInfoByUid方法<br></span><span>$user_client</span><span>-></span><span>asend_getInfoByUid</span><span>(</span><span>$uid</span><span>);<br></span><span>// 异步调用User::getEmail方法<br></span><span>$user_client</span><span>-></span><span>asend_getEmail</span><span>(</span><span>$uid</span><span>);<br><br></span><span>这里是其它的业务代码<br></span><span>....................<br>....................<br><br></span><span>// 需要数据的时候异步接收数据<br></span><span>$ret_async1 </span><span>= </span><span>$user_client</span><span>-></span><span>arecv_getEmail</span><span>(</span><span>$uid</span><span>);<br></span><span>$ret_async2 </span><span>= </span><span>$user_client</span><span>-></span><span>arecv_getInfoByUid</span><span>(</span><span>$uid</span><span>);<br><br></span><span>这里是其他业务逻辑<br></span></span></code>
访问http://ip:33737界面类似如下:
框架下载地址:
简单高效json协议版本
workerman-JsonRpc
多平台友好的thrift版本 workerman-Thrift